diff options
Diffstat (limited to 'services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event')
40 files changed, 4329 insertions, 0 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java new file mode 100644 index 000000000..552f949a2 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java @@ -0,0 +1,342 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class ApexEvent is an event class that external systems use to send events to and receive events from Apex engines. The event itself is a hash map of + * string keys and object values, used to pass data. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexEvent extends HashMap<String, Object> implements Serializable { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEvent.class); + + private static final long serialVersionUID = -4451918242101961685L; + + // Holds the next identifier for event execution. + private static AtomicLong nextExecutionID = new AtomicLong(0L); + + /** The name of the Apex event, a mandatory field. All Apex events must have a name so that the event can be looked up in the Apex policy model. */ + public static final String NAME_HEADER_FIELD = "name"; + + /** + * The version of the Apex event, an optional field. If a version is specified on an Apex event, the definition of that version of the event is taken from + * the Apex policy model. If no version is specified, the latest version of the event is used. + */ + public static final String VERSION_HEADER_FIELD = "version"; + + /** + * The name space of the Apex event, an optional field. If a name space is specified on an Apex event it must match the name space on the event definition + * taken from the Apex policy model. If no name space is specified, the name space from the event definition in the Apex policy model is used. + */ + public static final String NAMESPACE_HEADER_FIELD = "nameSpace"; + + /** + * The source of the Apex event, an optional field. It specifies where the Apex event has come from and its use is reserved for now. If no source is + * specified, the source from the event definition in the Apex policy model is used. + */ + public static final String SOURCE_HEADER_FIELD = "source"; + + /** + * The target of the Apex event, an optional field. It specifies where the Apex event is going to and its use is reserved for now. If no target is + * specified, the target from the event definition in the Apex policy model is used. + */ + public static final String TARGET_HEADER_FIELD = "target"; + + /** + * The exception message field of an Apex event is an exception message indicating that an event failed. + */ + public static final String EXCEPTION_MESSAGE_HEADER_FIELD = "exceptionMessage"; + + /** The name of an Apex event must match this regular expression. */ + public static final String NAME_REGEXP = "[A-Za-z0-9\\-_.]+"; + + /** The version of an Apex event must match this regular expression. */ + public static final String VERSION_REGEXP = "[A-Za-z0-9.]+"; + + /** The name space of an Apex event must match this regular expression. */ + public static final String NAMESPACE_REGEXP = "([a-zA_Z_][\\.\\w]*)"; + + /** The source of an Apex event must match this regular expression. */ + public static final String SOURCE_REGEXP = "^$|[A-Za-z0-9\\.\\-_:]+"; + + /** The target of an Apex event must match this regular expression. */ + public static final String TARGET_REGEXP = "^$|[A-Za-z0-9\\.\\-_:]+"; + + // The fields of the event + // @formatter:off + private final String name; + private final String version; + private final String nameSpace; + private final String source; + private final String target; + // @formatter:on + + // An identifier for the current event execution. The default value here will always be unique in a single JVM + private long executionID = ApexEvent.getNextExecutionID(); + + // A string holding a message that indicates why processing of this event threw an exception + private String exceptionMessage; + + /** + * Private utility to get the next candidate value for a Execution ID. This value will always be unique in a single JVM + * + * @return the next candidate value for a Execution ID + */ + private static synchronized long getNextExecutionID() { + return nextExecutionID.getAndIncrement(); + } + + /** + * Instantiates a new apex event. + * + * @param name the name of the event + * @param version the version of the event + * @param nameSpace the name space (java package) of the event + * @param source the source of the event + * @param target the target of the event + * @throws ApexEventException thrown on validation errors on event names and versions + */ + public ApexEvent(final String name, final String version, final String nameSpace, final String source, final String target) throws ApexEventException { + // @formatter:off + this.name = validateField("name", name, NAME_REGEXP); + this.version = validateField("version", version, VERSION_REGEXP); + this.nameSpace = validateField("nameSpace", nameSpace, NAMESPACE_REGEXP); + this.source = validateField("source", source, SOURCE_REGEXP); + this.target = validateField("target", target, TARGET_REGEXP); + // @formatter:on + } + + /** + * Check that a field of the event is valid. + * + * @param fieldName the name of the field to check + * @param fieldValue the value of the field to check + * @param fieldRegexp the regular expression to check the field against + * @return the validated field value + * @throws ApexEventException thrown if the field is invalid + */ + private String validateField(final String fieldName, final String fieldValue, final String fieldRegexp) throws ApexEventException { + if (fieldValue.matches(fieldRegexp)) { + return fieldValue; + } + else { + LOGGER.warn("event \"" + name + ": field \"" + fieldName + "=" + fieldValue + "\" is illegal. It doesn't match regex '" + fieldRegexp + "'"); + throw new ApexEventException("event \"" + name + ": field \"" + fieldName + "=" + fieldValue + "\" is illegal"); + } + } + + /** + * Check that the key of an event is valid. + * + * @param key the key + * @return the string + * @throws ApexEventException the apex event exception + */ + private String validKey(final String key) throws ApexEventException { + if (key.matches(NAME_REGEXP)) { + return key; + } + else { + LOGGER.warn("event \"" + name + ": key \"" + key + "\" is illegal"); + throw new ApexEventException("event \"" + name + ": key \"" + key + "\" is illegal"); + } + } + + /** + * Gets the name. + * + * @return the name + */ + public String getName() { + return name; + } + + /** + * Gets the version. + * + * @return the version + */ + public String getVersion() { + return version; + } + + /** + * Gets the name space. + * + * @return the name space + */ + public String getNameSpace() { + return nameSpace; + } + + /** + * Gets the source. + * + * @return the source + */ + public String getSource() { + return source; + } + + /** + * Gets the target. + * + * @return the target + */ + public String getTarget() { + return target; + } + + /** + * Gets the pass-thru executionID for this event. + * + * @return the executionID + */ + public long getExecutionID() { + return executionID; + } + + /** + * Sets the pass-thru executionID for this event. The default value for executionID will be be unique in the current JVM. For some applications/deployments + * this executionID may need to globally unique + * + * @param executionID the executionID + */ + public void setExecutionID(final long executionID) { + this.executionID = executionID; + } + + /** + * Gets the exception message explaining why processing of this event to fail. + * + * @return the exception message + */ + public String getExceptionMessage() { + return exceptionMessage; + } + + /** + * Sets the exception message explaining why processing of this event to fail. + * + * @param exceptionMessage the exception message + */ + public void setExceptionMessage(final String exceptionMessage) { + this.exceptionMessage = exceptionMessage; + } + + /* + * Map overrides from here + */ + + /* + * (non-Javadoc) + * + * @see java.util.Map#put(java.lang.Object, java.lang.Object) + */ + @Override + public Object put(final String key, final Object value) { + // Check if the key is valid + try { + return super.put(validKey(key), value); + } + catch (final ApexEventException e) { + return null; + } + } + + /* + * (non-Javadoc) + * + * @see java.util.Map#putAll(java.util.Map) + */ + @Override + public void putAll(final Map<? extends String, ? extends Object> incomingMap) { + // Check the keys are valid + try { + for (final String key : incomingMap.keySet()) { + validKey(key); + } + } + catch (final ApexEventException e) { + // One of the keys is invalid + return; + } + + // Go ahead and put everything + super.putAll(incomingMap); + } + + /* + * Object overrides from here + */ + + /* + * (non-Javadoc) + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + builder.append("name="); + builder.append(name); + builder.append(",version="); + builder.append(version); + builder.append(",nameSpace="); + builder.append(nameSpace); + builder.append(",source="); + builder.append(source); + builder.append(",target="); + builder.append(target); + builder.append(",executionID="); + builder.append(executionID); + builder.append(",exceptionMessage="); + builder.append(exceptionMessage); + builder.append(","); + builder.append("["); + + boolean firstData = true; + for (final Map.Entry<String, Object> dataEntry : this.entrySet()) { + if (firstData) { + firstData = false; + } + else { + builder.append(','); + } + + builder.append(dataEntry.getKey()); + builder.append('='); + builder.append(dataEntry.getValue()); + } + + builder.append("]"); + return builder.toString(); + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConsumer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConsumer.java new file mode 100644 index 000000000..53f11dd61 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConsumer.java @@ -0,0 +1,80 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event; + +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; + +/** + * This interface is used by technology specific consumers and listeners that are are listening for + * or collecting events for input into Apex. Users specify the consumer technology to use in the + * Apex configuration and Apex uses a factory to start the appropriate consumer plugin that + * implements this interface for its input. The technology specific implementation details are + * hidden behind this interface. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public interface ApexEventConsumer { + /** + * Initialize the consumer. + * + * @param name a name for this consumer + * @param consumerParameters the parameters to initialize this consumer + * @param apexEventReceiver the apex event receiver that should be used to pass events received + * by the consumer into Apex + * @throws ApexEventException container exception on errors initializing event handling + */ + void init(String name, EventHandlerParameters consumerParameters, ApexEventReceiver apexEventReceiver) + throws ApexEventException; + + /** + * Start the consumer, start input of events into Apex. + */ + void start(); + + /** + * Get the peered reference object for this consumer. + * + * @param peeredMode the peered mode for which to return the reference + * @return the peered reference object for this consumer + */ + PeeredReference getPeeredReference(EventHandlerPeeredMode peeredMode); + + /** + * Set the peered reference object for this consumer. + * + * @param peeredMode the peered mode for which to return the reference + * @param peeredReference the peered reference object for this consumer + */ + void setPeeredReference(EventHandlerPeeredMode peeredMode, PeeredReference peeredReference); + + /** + * Get the name of this event consumer. + * + * @return the event consumer name + */ + String getName(); + + /** + * Stop the event consumer. + */ + void stop(); +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConverter.java new file mode 100644 index 000000000..11f005ddf --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConverter.java @@ -0,0 +1,55 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event; + +import java.util.List; + +import org.onap.policy.apex.model.basicmodel.concepts.ApexException; + +/** + * The Interface ApexEventConverter is used for applications that want to convert arbitrary event + * types to and from Apex events. Application implement this interface to convert their events to + * and from Apex events.The Apex service can then use this interface to transparently transfer + * events into and out of an Apex system. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public interface ApexEventConverter { + + /** + * Convert an event of arbitrary type into an Apex event. + * + * @param name the name of the incoming event + * @param eventOfOtherType the event of some other type to convert + * @return the apex event + * @throws ApexException thrown on conversion errors + */ + List<ApexEvent> toApexEvent(String name, Object eventOfOtherType) throws ApexException; + + /** + * Convert an Apex event into an event of arbitrary type {@code OTHER_EVENT_TYPE}. + * + * @param apexEvent the apex event to convert + * @return the event converted into the other type + * @throws ApexException thrown on conversion errors + */ + Object fromApexEvent(ApexEvent apexEvent) throws ApexException; +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventException.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventException.java new file mode 100644 index 000000000..24f57d741 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventException.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event; + +import org.onap.policy.apex.model.basicmodel.concepts.ApexException; + +/** + * This class will be called if an error occurs in handling Apex events. + * + * @author eeilfn + */ +public class ApexEventException extends ApexException { + private static final long serialVersionUID = -4245694568321686450L; + + /** + * Instantiates a new apex event exception. + * + * @param message the message + */ + public ApexEventException(final String message) { + super(message); + } + + /** + * Instantiates a new apex event exception. + * + * @param message the message + * @param e the e + */ + public ApexEventException(final String message, final Exception e) { + super(message, e); + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventList.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventList.java new file mode 100644 index 000000000..9fe03ef47 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventList.java @@ -0,0 +1,32 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event; + +import java.util.ArrayList; + +/** + * The Class ApexEventList holds a list of APEX events. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexEventList extends ArrayList<ApexEvent> { + private static final long serialVersionUID = -8496211897512202896L; +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProducer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProducer.java new file mode 100644 index 000000000..414fbc9e3 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProducer.java @@ -0,0 +1,81 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event; + +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; + +/** + * This interface is used by technology specific producers and publishers that are handling events + * output by Apex. Users specify the producer technology to use in the Apex configuration and Apex + * uses a factory to start the appropriate producer plugin that implements this interface for its + * output. The technology specific implementation details are hidden behind this interface. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public interface ApexEventProducer { + + /** + * Initialize the producer. + * + * @param name a name for this producer + * @param producerParameters the parameters to initialise this producer + * @throws ApexEventException exception on errors initializing an event producer + */ + void init(String name, EventHandlerParameters producerParameters) throws ApexEventException; + + /** + * Get the peered reference object for this producer. + * + * @param peeredMode the peered mode for which to return the reference + * @return the peered reference object for this producer + */ + PeeredReference getPeeredReference(EventHandlerPeeredMode peeredMode); + + /** + * Set the peered reference object for this producer. + * + * @param peeredMode the peered mode for which to return the reference + * @param peeredReference the peered reference object for this producer + */ + void setPeeredReference(EventHandlerPeeredMode peeredMode, PeeredReference peeredReference); + + /** + * Send an event to the producer. + * + * @param executionId the unique ID that produced this event + * @param eventName The name of the event + * @param event The converted event as an object + */ + void sendEvent(long executionId, String eventName, Object event); + + /** + * Get the name of this event producer. + * + * @return the event producer name + */ + String getName(); + + /** + * Stop the event producer. + */ + void stop(); +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProtocolConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProtocolConverter.java new file mode 100644 index 000000000..ec19e65c3 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProtocolConverter.java @@ -0,0 +1,39 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event; + +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters; + +/** + * The Interface ApexEventProtocolConverter extends ApexEventConverter to allow + * EventProtocolParameters conversion parameters to be passed to the converter. + * + * @author John Keeney (john.keeney@ericsson.com) + */ +public interface ApexEventProtocolConverter extends ApexEventConverter { + + /** + * Initialise the converter instance with the parameters for the EventProtocol. + * + * @param parameters the parameters for the EventProtocol + */ + void init(EventProtocolParameters parameters); +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventReceiver.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventReceiver.java new file mode 100644 index 000000000..8d7e7bae5 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventReceiver.java @@ -0,0 +1,46 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event; + +/** + * This interface is used by an Apex event consumer {@link ApexEventConsumer} consumer to pass a + * received event to Apex. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public interface ApexEventReceiver { + /** + * Receive an event from a consumer for processing. + * + * @param executionId the unique ID for execution of this event + * @param event the event to receive + * @throws ApexEventException on exceptions receiving an event into Apex + */ + void receiveEvent(long executionId, Object event) throws ApexEventException; + + /** + * Receive an event from a consumer for processing. + * + * @param event the event to receive + * @throws ApexEventException on exceptions receiving an event into Apex + */ + void receiveEvent(Object event) throws ApexEventException; +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventRuntimeException.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventRuntimeException.java new file mode 100644 index 000000000..1a624face --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventRuntimeException.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event; + +import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException; + +/** + * This exception will be called if a runtime error occurs in Apex event handling. + * + * @author Liam Fallon + */ +public class ApexEventRuntimeException extends ApexRuntimeException { + private static final long serialVersionUID = -8507246953751956974L; + + /** + * Instantiates a new apex runtime event exception with a message. + * + * @param message the message + */ + public ApexEventRuntimeException(final String message) { + super(message); + } + + /** + * Instantiates a new apex runtime event exception with a message and a caused by exception. + * + * @param message the message + * @param e the exception that caused this exception to be thrown + */ + public ApexEventRuntimeException(final String message, final Exception e) { + super(message, e); + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPeriodicEventGenerator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPeriodicEventGenerator.java new file mode 100644 index 000000000..62663b9f1 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPeriodicEventGenerator.java @@ -0,0 +1,176 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event; + +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This class is used to generate periodic events into an Apex engine service. It is used to trigger + * policies that perform housekeeping operations. + * + * @author eeilfn + */ +public class ApexPeriodicEventGenerator extends TimerTask { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexPeriodicEventGenerator.class); + + /** The name of the periodic event. */ + public static final String PERIODIC_EVENT_NAME = "PERIODIC_EVENT"; + + /** The version of the periodic event. */ + public static final String PERIODIC_EVENT_VERSION = "0.0.1"; + + /** The name space of the periodic event. */ + public static final String PERIODIC_EVENT_NAMESPACE = "com.ericsson.apex.service.engine.event"; + + /** The source of the periodic event. */ + public static final String PERIODIC_EVENT_SOURCE = "internal"; + + /** The target of the periodic event. */ + public static final String PERIODIC_EVENT_TARGET = "internal"; + + /** + * The field name in the periodic event for the delay between occurrences of the periodic event. + */ + public static final String PERIODIC_DELAY = "PERIODIC_DELAY"; + + /** + * The field name in the periodic event for the time at which the first periodic event will + * occur. + */ + public static final String PERIODIC_FIRST_TIME = "PERIODIC_FIRST_TIME"; + + /** + * The field name in the periodic event for the time at which the last periodic event will + * occur. + */ + public static final String PERIODIC_LAST_TIME = "PERIODIC_LAST_TIME"; + + /** The field name in the periodic event for the time at which the event was sent. */ + public static final String PERIODIC_CURRENT_TIME = "PERIODIC_CURRENT_TIME"; + + /** + * The field name in the periodic event for the number of occurrences of this event that have + * been sent to date, this is a sequence number for the periodic event. + */ + public static final String PERIODIC_EVENT_COUNT = "PERIODIC_EVENT_COUNT"; + + // The Java timer used to send periodic events + private Timer timer = null; + + // The engine service interface we'll send periodic events to + private final EngineServiceEventInterface engineServiceEventInterface; + + // Timing information + private long period = 0; + private long firstEventTime = 0; + private long lastEventTime = 0; + private long eventCount = 0; + + /** + * Constructor, save a reference to the event stream handler. + * + * @param engineServiceEventInterface the engine service event interface on which to send + * periodic events + * @param period The period in milliseconds between events + */ + public ApexPeriodicEventGenerator(final EngineServiceEventInterface engineServiceEventInterface, + final long period) { + // Save the engine service reference and delay + this.engineServiceEventInterface = engineServiceEventInterface; + this.period = period; + + timer = new Timer(ApexPeriodicEventGenerator.class.getSimpleName(), true); + timer.schedule(this, period, period); + } + + /** + * Output the metrics for stream loading. + */ + @Override + public void run() { + final Map<String, Object> periodicEventMap = new HashMap<>(); + + // Record the current event time + final long currentEventTime = System.currentTimeMillis(); + + // Check if this is the first periodic event + if (firstEventTime == 0) { + firstEventTime = currentEventTime; + lastEventTime = currentEventTime; + } + + // Increment the event counter + eventCount++; + + // Set the fields in the periodic event + periodicEventMap.put(PERIODIC_DELAY, period); + periodicEventMap.put(PERIODIC_FIRST_TIME, firstEventTime); + periodicEventMap.put(PERIODIC_LAST_TIME, lastEventTime); + periodicEventMap.put(PERIODIC_CURRENT_TIME, currentEventTime); + periodicEventMap.put(PERIODIC_EVENT_COUNT, eventCount); + + // Send the periodic event + try { + final ApexEvent periodicEvent = new ApexEvent(PERIODIC_EVENT_NAME, PERIODIC_EVENT_VERSION, + PERIODIC_EVENT_NAMESPACE, PERIODIC_EVENT_SOURCE, PERIODIC_EVENT_TARGET); + periodicEvent.putAll(periodicEventMap); + engineServiceEventInterface.sendEvent(periodicEvent); + } catch (final ApexEventException e) { + LOGGER.warn("could not send Apex periodic event " + PERIODIC_EVENT_NAME + ":" + PERIODIC_EVENT_VERSION, e); + return; + } + + // Save the current time as the last time + lastEventTime = currentEventTime; + } + + /** + * Cancel the timer. + * + * @return true, if cancel + */ + @Override + public boolean cancel() { + // Cancel the timer + if (timer != null) { + timer.cancel(); + } + return true; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return "ApexPeriodicEventGenerator [period=" + period + ", firstEventTime=" + firstEventTime + + ", lastEventTime=" + lastEventTime + ", eventCount=" + eventCount + "]"; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/PeeredReference.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/PeeredReference.java new file mode 100644 index 000000000..9560a834c --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/PeeredReference.java @@ -0,0 +1,70 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event; + +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; + +/** + * This class holds a reference to an event consumer and producer that have been peered. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class PeeredReference { + // The consumer putting events into APEX + private final ApexEventConsumer peeredConsumer; + + // The synchronous producer taking events out of APEX + private final ApexEventProducer peeredProducer; + + /** + * Create a peered consumer/producer reference + * + * @param peeredMode the peered mode for which to return the reference + * @param consumer the consumer that is receiving event + * @param producer the producer that is sending events + */ + public PeeredReference(final EventHandlerPeeredMode peeredMode, final ApexEventConsumer consumer, final ApexEventProducer producer) { + this.peeredConsumer = consumer; + this.peeredProducer = producer; + + // Set the peered reference on the producer and consumer + peeredConsumer.setPeeredReference(peeredMode, this); + peeredProducer.setPeeredReference(peeredMode, this); + } + + /** + * Gets the synchronous consumer putting events into the cache. + * + * @return the source synchronous consumer + */ + public ApexEventConsumer getPeeredConsumer() { + return peeredConsumer; + } + + /** + * Gets the synchronous producer taking events from the cache. + * + * @return the synchronous producer that is taking events from the cache + */ + public ApexEventProducer getPeeredProducer() { + return peeredProducer; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java new file mode 100644 index 000000000..25f92d843 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java @@ -0,0 +1,294 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event; + +import java.util.AbstractMap.SimpleEntry; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This class holds a cache of the synchronous events sent into Apex and that have not yet been replied to. It runs a thread to time out events that have not + * been replied to in the specified timeout. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class SynchronousEventCache extends PeeredReference implements Runnable { + // Get a reference to the logger + private static final XLogger LOGGER = XLoggerFactory.getXLogger(SynchronousEventCache.class); + + // The default amount of time to wait for a synchronous event to be replied to is 1 second + private static final long DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT = 1000; + + // The timeout to wait between event polls in milliseconds and the time to wait for the thread to stop + private static final long OUTSTANDING_EVENT_POLL_TIMEOUT = 50; + private static final long CACHE_STOP_WAIT_INTERVAL = 10; + + // The time in milliseconds to wait for the reply to a sent synchronous event + private long synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT; + + // Map holding outstanding synchronous events + private final Map<Long, SimpleEntry<Long, Object>> toApexEventMap = new HashMap<Long, SimpleEntry<Long, Object>>(); + + // Map holding reply events + private final Map<Long, SimpleEntry<Long, Object>> fromApexEventMap = new HashMap<Long, SimpleEntry<Long, Object>>(); + + // The message listener thread and stopping flag + private final Thread synchronousEventCacheThread; + private boolean stopOrderedFlag = false; + + /** + * Create a synchronous event cache that caches outstanding synchronous Apex events. + * + * @param peeredMode the peered mode for which to return the reference + * @param consumer the consumer that is populating the cache + * @param producer the producer that is emptying the cache + * @param synchronousEventTimeout the time in milliseconds to wait for the reply to a sent synchronous event + */ + public SynchronousEventCache(final EventHandlerPeeredMode peeredMode, final ApexEventConsumer consumer, final ApexEventProducer producer, final long synchronousEventTimeout) { + super(peeredMode, consumer, producer); + + if (synchronousEventTimeout != 0) { + this.synchronousEventTimeout = synchronousEventTimeout; + } + else { + this.synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT; + } + + // Start scanning the outstanding events + synchronousEventCacheThread = new Thread(this); + synchronousEventCacheThread.setDaemon(true); + synchronousEventCacheThread.start(); + } + + /** + * Gets the timeout value for synchronous events. + * + * @return the synchronous event timeout + */ + public long getSynchronousEventTimeout() { + return synchronousEventTimeout; + } + + /** + * Cache a synchronized event sent into Apex in the event cache. + * + * @param executionId the execution ID that was assigned to the event + * @param event the apex event + */ + public void cacheSynchronizedEventToApex(final long executionId, final Object event) { + // Add the event to the map + synchronized (toApexEventMap) { + cacheSynchronizedEvent(toApexEventMap, executionId, event); + } + } + + /** + * Remove the record of an event sent to Apex if it exists in the cache. + * + * @param executionId the execution ID of the event + * @return The removed event + */ + public Object removeCachedEventToApexIfExists(final long executionId) { + synchronized (toApexEventMap) { + return removeCachedEventIfExists(toApexEventMap, executionId); + } + } + + /** + * Check if an event exists in the to apex cache. + * + * @param executionId the execution ID of the event + * @return true if the event exists, false otherwise + */ + public boolean existsEventToApex(final long executionId) { + synchronized (toApexEventMap) { + return toApexEventMap.containsKey(executionId); + } + } + + /** + * Cache synchronized event received from Apex in the event cache. + * + * @param executionId the execution ID of the event + * @param event the apex event + */ + public void cacheSynchronizedEventFromApex(final long executionId, final Object event) { + // Add the event to the map + synchronized (fromApexEventMap) { + cacheSynchronizedEvent(fromApexEventMap, executionId, event); + } + } + + /** + * Remove the record of an event received from Apex if it exists in the cache. + * + * @param executionId the execution ID of the event + * @return The removed event + */ + public Object removeCachedEventFromApexIfExists(final long executionId) { + synchronized (fromApexEventMap) { + return removeCachedEventIfExists(fromApexEventMap, executionId); + } + } + + /** + * Check if an event exists in the from apex cache. + * + * @param executionId the execution ID of the event + * @return true if the event exists, false otherwise + */ + public boolean existsEventFromApex(final long executionId) { + synchronized (fromApexEventMap) { + return fromApexEventMap.containsKey(executionId); + } + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + LOGGER.entry(); + + // Periodic scan of outstanding events + while (synchronousEventCacheThread.isAlive() && !stopOrderedFlag) { + ThreadUtilities.sleep(OUTSTANDING_EVENT_POLL_TIMEOUT); + + // Check for timeouts on events + synchronized (toApexEventMap) { + timeoutEventsOnCache(toApexEventMap); + } + synchronized (fromApexEventMap) { + timeoutEventsOnCache(fromApexEventMap); + } + } + + LOGGER.exit(); + } + + /** + * Stops the scanning thread and clears the cache. + */ + public synchronized void stop() { + LOGGER.entry(); + stopOrderedFlag = true; + + while (synchronousEventCacheThread.isAlive()) { + ThreadUtilities.sleep(CACHE_STOP_WAIT_INTERVAL); + } + + // Check if there are any unprocessed events + if (!toApexEventMap.isEmpty()) { + LOGGER.warn(toApexEventMap.size() + " synchronous events dropped due to system shutdown"); + } + + toApexEventMap.clear(); + LOGGER.exit(); + } + + /** + * Cache a synchronized event sent in an event cache. + * @param eventCacheMap the map to cache the event on + * @param executionId the execution ID of the event + * @param event the event to cache + */ + private void cacheSynchronizedEvent(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap, final long executionId, final Object event) { + LOGGER.entry("Adding event with execution ID: " + executionId); + + // Check if the event is already in the cache + if (eventCacheMap.containsKey(executionId)) { + // If there was no sent event then the event timed out or some unexpected event was received + final String errorMessage = "an event with ID " + executionId + + " already exists in the synchronous event cache, execution IDs must be unique in the system"; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + + // Add the event to the map + eventCacheMap.put(executionId, new SimpleEntry<Long, Object>(System.currentTimeMillis(), event)); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("event has been cached:" + event); + } + + LOGGER.exit("Added: " + executionId); + } + + /** + * Remove the record of an event if it exists in the cache. + * + * @param eventCacheMap the map to remove the event from + * @param executionId the execution ID of the event + * @return The removed event + */ + private Object removeCachedEventIfExists(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap, final long executionId) { + LOGGER.entry("Removing: " + executionId); + + final SimpleEntry<Long, Object> removedEventEntry = eventCacheMap.remove(executionId); + + if (removedEventEntry != null) { + LOGGER.exit("Removed: " + executionId); + return removedEventEntry.getValue(); + } + else { + // The event may not be one of the events in our cache, so we just ignore removal failures + return null; + } + } + + /** + * Time out events on an event cache map. Events that have a timeout longer than the configured timeout are timed out. + * @param eventCacheMap the event cache to operate on + */ + private void timeoutEventsOnCache(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap) { + // Use a set to keep track of the events that have timed out + final Set<Long> timedOutEventSet = new HashSet<>(); + + for (final Entry<Long, SimpleEntry<Long, Object>> cachedEventEntry : eventCacheMap.entrySet()) { + // The amount of time we are waiting for the event reply + final long eventWaitTime = System.currentTimeMillis() - cachedEventEntry.getValue().getKey(); + + // Have we a timeout? + if (eventWaitTime > synchronousEventTimeout) { + timedOutEventSet.add(cachedEventEntry.getKey()); + } + } + + // Remove timed out events from the map + for (final long timedoutEventExecutionID : timedOutEventSet) { + // Remove the map entry and issue a warning + final SimpleEntry<Long, Object> timedOutEventEntry = eventCacheMap.remove(timedoutEventExecutionID); + + LOGGER.warn("synchronous event timed out, reply not received in " + synchronousEventTimeout + " milliseconds on event " + + timedOutEventEntry.getValue()); + } + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java new file mode 100644 index 000000000..8f54c049b --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java @@ -0,0 +1,83 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl; + +import org.onap.policy.apex.service.engine.event.ApexEventConsumer; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This factory class creates event consumers of various technology types for Apex engines. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class EventConsumerFactory { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(EventConsumerFactory.class); + + /** + * Empty constructor with no generic overloading. + */ + public EventConsumerFactory() {} + + /** + * Create an event consumer of the required type for the specified consumer technology. + * + * @param name the name of the consumer + * @param consumerParameters The parameters for the Apex engine, we use the technology type of + * the required consumer + * @return the event consumer + * @throws ApexEventException on errors creating the Apex event consumer + */ + public ApexEventConsumer createConsumer(final String name, final EventHandlerParameters consumerParameters) + throws ApexEventException { + // Get the carrier technology parameters + final CarrierTechnologyParameters technologyParameters = consumerParameters.getCarrierTechnologyParameters(); + + // Get the class for the event consumer using reflection + final String consumerPluginClass = technologyParameters.getEventConsumerPluginClass(); + Object consumerPluginObject = null; + try { + consumerPluginObject = Class.forName(consumerPluginClass).newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + final String errorMessage = "could not create an Apex event consumer for \"" + name + + "\" for the carrier technology \"" + technologyParameters.getLabel() + + "\", specified event consumer plugin class \"" + consumerPluginClass + "\" not found"; + LOGGER.error(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Check the class is an event consumer + if (!(consumerPluginObject instanceof ApexEventConsumer)) { + final String errorMessage = "could not create an Apex event consumer \"" + name + + "\" for the carrier technology \"" + technologyParameters.getLabel() + + "\", specified event consumer plugin class \"" + consumerPluginClass + + "\" is not an instance of \"" + ApexEventConsumer.class.getCanonicalName() + "\""; + LOGGER.error(errorMessage); + throw new ApexEventException(errorMessage); + } + + return (ApexEventConsumer) consumerPluginObject; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java new file mode 100644 index 000000000..9bbbad362 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java @@ -0,0 +1,82 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl; + +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This factory class creates event producers for the defined technology type for Apex engines. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class EventProducerFactory { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(EventProducerFactory.class); + + /** + * Empty constructor with no generic overloading. + */ + public EventProducerFactory() {} + + /** + * Create an event producer of the required type for the specified producer technology. + * + * @param name the name of the producer + * @param producerParameters The Apex parameters containing the configuration for the producer + * @return the event producer + * @throws ApexEventException on errors creating the Apex event producer + */ + public ApexEventProducer createProducer(final String name, final EventHandlerParameters producerParameters) + throws ApexEventException { + // Get the carrier technology parameters + final CarrierTechnologyParameters technologyParameters = producerParameters.getCarrierTechnologyParameters(); + + // Get the class for the event producer using reflection + final String producerPluginClass = technologyParameters.getEventProducerPluginClass(); + Object producerPluginObject = null; + try { + producerPluginObject = Class.forName(producerPluginClass).newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + final String errorMessage = "could not create an Apex event producer for Producer \"" + name + + "\" for the carrier technology \"" + technologyParameters.getLabel() + + "\", specified event producer plugin class \"" + producerPluginClass + "\" not found"; + LOGGER.error(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Check the class is an event producer + if (!(producerPluginObject instanceof ApexEventProducer)) { + final String errorMessage = "could not create an Apex event producer for Producer \"" + name + + "\" for the carrier technology \"" + technologyParameters.getLabel() + + "\", specified event producer plugin class \"" + producerPluginClass + + "\" is not an instance of \"" + ApexEventProducer.class.getCanonicalName() + "\""; + LOGGER.error(errorMessage); + throw new ApexEventException(errorMessage); + } + + return (ApexEventProducer) producerPluginObject; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProtocolFactory.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProtocolFactory.java new file mode 100644 index 000000000..85c5bf03f --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProtocolFactory.java @@ -0,0 +1,76 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl; + +import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This factory class uses the Apex event protocol parameters to create and return an instance of + * the correct Apex event protocol converter plugin for the specified event protocol. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class EventProtocolFactory { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(EventProtocolFactory.class); + + /** + * Create an event converter that converts between an + * {@link org.onap.policy.apex.service.engine.event.ApexEvent} and the specified event protocol. + * + * @param name the name of the event protocol + * @param eventProtocolParameters the event protocol parameters defining what to convert from + * and to + * @return The event converter for converting events to and from Apex format + */ + public ApexEventProtocolConverter createConverter(final String name, + final EventProtocolParameters eventProtocolParameters) { + // Get the class for the event protocol plugin using reflection + final String eventProtocolPluginClass = eventProtocolParameters.getEventProtocolPluginClass(); + Object eventProtocolPluginObject = null; + try { + eventProtocolPluginObject = Class.forName(eventProtocolPluginClass).newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + final String errorMessage = "could not create an Apex event protocol converter for \"" + name + + "\" for the protocol \"" + eventProtocolParameters.getLabel() + + "\", specified event protocol converter plugin class \"" + eventProtocolPluginClass + + "\" not found"; + LOGGER.error(errorMessage, e); + throw new ApexEventRuntimeException(errorMessage, e); + } + + // Check the class is an event consumer + if (!(eventProtocolPluginObject instanceof ApexEventProtocolConverter)) { + final String errorMessage = "could not create an Apex event protocol converter for \"" + name + + "\" for the protocol \"" + eventProtocolParameters.getLabel() + + "\", specified event protocol converter plugin class \"" + eventProtocolPluginClass + + "\" is not an instance of \"" + ApexEventProtocolConverter.class.getCanonicalName() + "\""; + LOGGER.error(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + ((ApexEventProtocolConverter) eventProtocolPluginObject).init(eventProtocolParameters); + return (ApexEventProtocolConverter) eventProtocolPluginObject; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/Apex2ApexEventConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/Apex2ApexEventConverter.java new file mode 100644 index 000000000..b73aeb567 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/Apex2ApexEventConverter.java @@ -0,0 +1,140 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.apexprotocolplugin; + +import java.util.ArrayList; +import java.util.List; + +import org.onap.policy.apex.service.engine.event.ApexEvent; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventList; +import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class Apex2ApexEventConverter passes through {@link ApexEvent} instances. It is used for + * transferring Apex events directly as POJOs between APEX producers and consumers. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class Apex2ApexEventConverter implements ApexEventProtocolConverter { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(Apex2ApexEventConverter.class); + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter#init(org.onap.policy. + * apex. service.parameters.eventprotocol.EventProtocolParameters) + */ + @Override + public void init(final EventProtocolParameters parameters) { + // Check and get the APEX parameters + if (!(parameters instanceof ApexEventProtocolParameters)) { + final String errorMessage = "specified consumer properties are not applicable to the APEX event protocol"; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventConverter#toApexEvent(java.lang.String, + * java.lang.Object) + */ + @Override + public List<ApexEvent> toApexEvent(final String eventName, final Object eventObject) throws ApexEventException { + // Check the event eventObject + if (eventObject == null) { + LOGGER.warn("event processing failed, event is null"); + throw new ApexEventException("event processing failed, event is null"); + } + + // The list of events we will return + final List<ApexEvent> eventList = new ArrayList<>(); + + try { + // Check if its a single APEX event + if (!(eventObject instanceof ApexEvent)) { + throw new ApexEventException("incoming event (" + eventObject + ") is not an ApexEvent"); + } + + final ApexEvent event = (ApexEvent) eventObject; + + // Check whether we have any ApexEventList fields, if so this is an event of events and + // all fields should be of type ApexEventList + boolean foundEventListFields = false; + boolean foundOtherFields = false; + for (final Object fieldObject : event.values()) { + if (fieldObject instanceof ApexEventList) { + foundEventListFields = true; + + // Add the events to the event list + eventList.addAll((ApexEventList) fieldObject); + } else { + foundOtherFields = true; + } + } + + // If we found both event list fields and other fields we're in trouble + if (foundEventListFields && foundOtherFields) { + throw new ApexEventException("incoming event (" + eventObject + + ") has both event list fields and other fields, it cannot be processed"); + } + + // Check if the incoming event just has other fields, if so it's just a regular event + // and we add it to the event list as the only event there + if (foundOtherFields) { + eventList.add(event); + } + } catch (final Exception e) { + final String errorString = "Failed to unmarshal APEX event: " + e.getMessage() + ", event=" + eventObject; + LOGGER.warn(errorString, e); + throw new ApexEventException(errorString, e); + } + + // Return the list of events we have unmarshalled + return eventList; + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventConverter#fromApexEvent(org.onap.policy. + * apex.service.engine.event.ApexEvent) + */ + @Override + public Object fromApexEvent(final ApexEvent apexEvent) throws ApexEventException { + // Check the Apex event + if (apexEvent == null) { + LOGGER.warn("event processing failed, Apex event is null"); + throw new ApexEventException("event processing failed, Apex event is null"); + } + + return apexEvent; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/ApexEventProtocolParameters.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/ApexEventProtocolParameters.java new file mode 100644 index 000000000..10cd58eb7 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/ApexEventProtocolParameters.java @@ -0,0 +1,58 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.apexprotocolplugin; + +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters; + +/** + * Event protocol parameters for JSON as an event protocol, there are no user defined parameters. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexEventProtocolParameters extends EventProtocolParameters { + /** The label of this event protocol. */ + public static final String APEX_EVENT_PROTOCOL_LABEL = "APEX"; + + /** + * Constructor to create a JSON event protocol parameter instance and register the instance with + * the parameter service. + */ + public ApexEventProtocolParameters() { + this(ApexEventProtocolParameters.class.getCanonicalName(), APEX_EVENT_PROTOCOL_LABEL); + } + + /** + * Constructor to create an event protocol parameters instance with the name of a sub class of + * this class. + * + * @param parameterClassName the class name of a sub class of this class + * @param eventProtocolLabel the name of the event protocol for this plugin + */ + public ApexEventProtocolParameters(final String parameterClassName, final String eventProtocolLabel) { + super(parameterClassName); + + // Set the event protocol properties for the JSON event protocol + this.setLabel(eventProtocolLabel); + + // Set the event protocol plugin class + this.setEventProtocolPluginClass(Apex2ApexEventConverter.class.getCanonicalName()); + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/package-info.java new file mode 100644 index 000000000..a3c7d0d79 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Contains the implementation of the APEX event protocol converter plugin for events in Json + * format. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl.apexprotocolplugin; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java new file mode 100644 index 000000000..90a19fff2 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java @@ -0,0 +1,143 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.enevent; + +import java.util.ArrayList; +import java.util.List; + +import org.onap.policy.apex.service.engine.event.ApexEvent; +import org.onap.policy.apex.service.engine.event.ApexEventConverter; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +import org.onap.policy.apex.core.engine.engine.ApexEngine; +import org.onap.policy.apex.core.engine.event.EnEvent; +import org.onap.policy.apex.model.basicmodel.concepts.ApexException; +import org.onap.policy.apex.model.basicmodel.service.ModelService; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvents; + +/** + * The Class ApexEvent2EnEventConverter converts externally facing {@link ApexEvent} instances to + * and from instances of {@link EnEvent} that are used internally in the Apex engine core. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public final class ApexEvent2EnEventConverter implements ApexEventConverter { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEvent2EnEventConverter.class); + + // The Apex engine with its event definitions + private final ApexEngine apexEngine; + + /** + * Set up the event converter. + * + * @param apexEngine The engine to use to create events to be converted + */ + public ApexEvent2EnEventConverter(final ApexEngine apexEngine) { + this.apexEngine = apexEngine; + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventConverter#toApexEvent(java.lang.String, + * java.lang.Object) + */ + @Override + public List<ApexEvent> toApexEvent(final String eventName, final Object event) throws ApexException { + // Check the Engine event + if (event == null) { + LOGGER.warn("event processing failed, engine event is null"); + throw new ApexEventException("event processing failed, engine event is null"); + } + + // Cast the event to an Engine event event, if our conversion is correctly configured, this + // cast should always work + EnEvent enEvent = null; + try { + enEvent = (EnEvent) event; + } catch (final Exception e) { + final String errorMessage = "error transferring event \"" + event + "\" to the Apex engine"; + LOGGER.debug(errorMessage, e); + throw new ApexEventRuntimeException(errorMessage, e); + } + + // Create the Apex event + final AxEvent axEvent = enEvent.getAxEvent(); + final ApexEvent apexEvent = new ApexEvent(axEvent.getKey().getName(), axEvent.getKey().getVersion(), + axEvent.getNameSpace(), axEvent.getSource(), axEvent.getTarget()); + + // Copy the ExecutionID from the EnEvent into the ApexEvent + apexEvent.setExecutionID(enEvent.getExecutionID()); + + // Copy he exception message to the Apex event if it is set + if (enEvent.getExceptionMessage() != null) { + apexEvent.setExceptionMessage(enEvent.getExceptionMessage()); + } + + // Set the data on the apex event + apexEvent.putAll(enEvent); + + // Return the event in a single element + final ArrayList<ApexEvent> eventList = new ArrayList<ApexEvent>(); + eventList.add(apexEvent); + return eventList; + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventConverter#fromApexEvent(org.onap.policy. + * apex.service.engine.event.ApexEvent) + */ + @Override + public EnEvent fromApexEvent(final ApexEvent apexEvent) throws ApexException { + // Check the Apex model + if (apexEngine == null) { + LOGGER.warn("event processing failed, apex engine is null"); + throw new ApexEventException("event processing failed, apex engine is null"); + } + + // Get the event definition + final AxEvent eventDefinition = ModelService.getModel(AxEvents.class).get(apexEvent.getName()); + if (eventDefinition == null) { + LOGGER.warn("event processing failed, event \"" + apexEvent.getName() + "\" not found in apex model"); + throw new ApexEventException( + "event processing failed, event \"" + apexEvent.getName() + "\" not found in apex model"); + } + + // Create the internal engine event + final EnEvent enEvent = apexEngine.createEvent(eventDefinition.getKey()); + + // Set the data on the engine event + enEvent.putAll(apexEvent); + + // copy the ExecutionID from the ApexEvent into the EnEvent + enEvent.setExecutionID(apexEvent.getExecutionID()); + + return enEvent; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/package-info.java new file mode 100644 index 000000000..6bc6bc2b3 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/package-info.java @@ -0,0 +1,28 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Provides conversion between externally facing + * {@link org.onap.policy.apex.service.engine.event.ApexEvent} instances and internal + * {@link org.onap.policy.apex.core.engine.event.EnEvent} instances. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl.enevent; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorCarrierTechnologyParameters.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorCarrierTechnologyParameters.java new file mode 100644 index 000000000..fb722ea2f --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorCarrierTechnologyParameters.java @@ -0,0 +1,67 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.eventrequestor; + +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; + +/** + * This class holds the parameters that allows an output event to to be sent back into APEX as one + * or multiple input events, there are no user defined parameters. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class EventRequestorCarrierTechnologyParameters extends CarrierTechnologyParameters { + // @formatter:off + /** The label of this carrier technology. */ + public static final String EVENT_REQUESTOR_CARRIER_TECHNOLOGY_LABEL = "EVENT_REQUESTOR"; + + /** The producer plugin class for the EVENT_REQUESTOR carrier technology. */ + public static final String EVENT_REQUESTOR_EVENT_PRODUCER_PLUGIN_CLASS = + EventRequestorProducer.class.getCanonicalName(); + + /** The consumer plugin class for the EVENT_REQUESTOR carrier technology. */ + public static final String EVENT_REQUESTOR_EVENT_CONSUMER_PLUGIN_CLASS = + EventRequestorConsumer.class.getCanonicalName(); + // @formatter:on + + /** + * Constructor to create an event requestor carrier technology parameters instance and register + * the instance with the parameter service. + */ + public EventRequestorCarrierTechnologyParameters() { + super(EventRequestorCarrierTechnologyParameters.class.getCanonicalName()); + + // Set the carrier technology properties for the EVENT_REQUESTOR carrier technology + this.setLabel(EVENT_REQUESTOR_CARRIER_TECHNOLOGY_LABEL); + this.setEventProducerPluginClass(EVENT_REQUESTOR_EVENT_PRODUCER_PLUGIN_CLASS); + this.setEventConsumerPluginClass(EVENT_REQUESTOR_EVENT_CONSUMER_PLUGIN_CLASS); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() + */ + @Override + public String validate() { + return ""; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java new file mode 100644 index 000000000..b472cc9c7 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java @@ -0,0 +1,218 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.eventrequestor; + +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.service.engine.event.ApexEventConsumer; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventReceiver; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements an Apex event consumer that receives events from its peered event requestor + * producer. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class EventRequestorConsumer implements ApexEventConsumer, Runnable { + // Get a reference to the logger + private static final Logger LOGGER = LoggerFactory.getLogger(EventRequestorConsumer.class); + + // The amount of time to wait in milliseconds between checks that the consumer thread has + // stopped + private static final long EVENT_REQUESTOR_WAIT_SLEEP_TIME = 50; + + // The event receiver that will receive events from this consumer + private ApexEventReceiver eventReceiver; + + // The name for this consumer + private String name = null; + + // The peer references for this event handler + private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = + new EnumMap<>(EventHandlerPeeredMode.class); + + // Temporary request holder for incoming event send requests + private final BlockingQueue<Object> incomingEventRequestQueue = new LinkedBlockingQueue<>(); + + // The consumer thread and stopping flag + private Thread consumerThread; + private boolean stopOrderedFlag = false; + + // The number of events received to date + private int eventsReceived = 0; + + @Override + public void init(final String consumerName, final EventHandlerParameters consumerParameters, + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + this.eventReceiver = incomingEventReceiver; + this.name = consumerName; + + // Check and get the event requestor consumer properties + if (!(consumerParameters + .getCarrierTechnologyParameters() instanceof EventRequestorCarrierTechnologyParameters)) { + final String errorMessage = + "specified consumer properties are not applicable to event Requestor consumer (" + this.name + ")"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + + // Check if we are in peered mode + if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) { + final String errorMessage = "event Requestor consumer (" + this.name + + ") must run in peered requestor mode with a event Requestor producer"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + + } + + /** + * Receive an incoming event send request from the peered event Requestor producer and queue it + * + * @param eventObject the incoming event to process + * @throws ApexEventRuntimeException on queueing errors + */ + public void processEvent(final Object eventObject) { + // Push the event onto the queue for handling + try { + incomingEventRequestQueue.add(eventObject); + } catch (final Exception e) { + final String errorMessage = + "could not queue request \"" + eventObject + "\" on event Requestor consumer (" + this.name + ")"; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start() + */ + @Override + public void start() { + // Configure and start the event reception thread + final String threadName = this.getClass().getName() + ":" + this.name; + consumerThread = new ApplicationThreadFactory(threadName).newThread(this); + consumerThread.setDaemon(true); + consumerThread.start(); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName() + */ + @Override + public String getName() { + return name; + } + + /** + * Get the number of events received to date + * + * @return the number of events received + */ + public int getEventsReceived() { + return eventsReceived; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode, + * org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + // The endless loop that receives events using REST calls + while (consumerThread.isAlive() && !stopOrderedFlag) { + try { + // Take the next event from the queue + final Object eventObject = + incomingEventRequestQueue.poll(EVENT_REQUESTOR_WAIT_SLEEP_TIME, TimeUnit.MILLISECONDS); + if (eventObject == null) { + // Poll timed out, wait again + continue; + } + + // Send the event into Apex + eventReceiver.receiveEvent(eventObject); + + eventsReceived++; + } catch (final InterruptedException e) { + LOGGER.debug("Thread interrupted, Reason {}", e.getMessage()); + Thread.currentThread().interrupt(); + } catch (final Exception e) { + LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); + } + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventConsumer#stop() + */ + @Override + public void stop() { + stopOrderedFlag = true; + + while (consumerThread.isAlive()) { + ThreadUtilities.sleep(EVENT_REQUESTOR_WAIT_SLEEP_TIME); + } + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java new file mode 100644 index 000000000..4a972f2ce --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java @@ -0,0 +1,178 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.eventrequestor; + +import java.util.EnumMap; +import java.util.Map; + +import org.onap.policy.apex.service.engine.event.ApexEventConsumer; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.SynchronousEventCache; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Concrete implementation of an Apex event producer that sends one or more events to its peered + * event requestor consumer. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + * + */ +public class EventRequestorProducer implements ApexEventProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(EventRequestorProducer.class); + + // The name for this producer + private String name = null; + + // The peer references for this event handler + private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = + new EnumMap<>(EventHandlerPeeredMode.class); + + // The number of events sent + private int eventsSent = 0; + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String, + * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters) + */ + @Override + public void init(final String producerName, final EventHandlerParameters producerParameters) + throws ApexEventException { + this.name = producerName; + + // Check and get the producer Properties + if (!(producerParameters + .getCarrierTechnologyParameters() instanceof EventRequestorCarrierTechnologyParameters)) { + final String errorMessage = + "specified consumer properties are not applicable to event requestor producer (" + this.name + ")"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + + // Check if we are in peered mode + if (!producerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) { + final String errorMessage = "Event Requestor producer (" + this.name + + ") must run in peered requestor mode with a Event Requestor consumer"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName() + */ + @Override + public String getName() { + return name; + } + + /** + * Get the number of events sent to date + * + * @return the number of events received + */ + public int getEventsSent() { + return eventsSent; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode, + * org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang. + * String, java.lang.Object) + */ + @Override + public void sendEvent(final long executionId, final String eventName, final Object eventObject) { + // Check if this is a synchronized event, if so we have received a reply + final SynchronousEventCache synchronousEventCache = + (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); + if (synchronousEventCache != null) { + synchronousEventCache.removeCachedEventToApexIfExists(executionId); + } + + // Find the peered consumer for this producer + final PeeredReference peeredRequestorReference = peerReferenceMap.get(EventHandlerPeeredMode.REQUESTOR); + if (peeredRequestorReference != null) { + // Find the event Response Consumer that will handle this request + final ApexEventConsumer consumer = peeredRequestorReference.getPeeredConsumer(); + if (!(consumer instanceof EventRequestorConsumer)) { + final String errorMessage = "send of event to event consumer \"" + + peeredRequestorReference.getPeeredConsumer() + "\" failed," + + " event response consumer is not an instance of EventRequestorConsumer\n" + eventObject; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + + // Use the consumer to handle this event + final EventRequestorConsumer eventRequstConsumer = (EventRequestorConsumer) consumer; + eventRequstConsumer.processEvent(eventObject); + + eventsSent++; + } else { + // No peered consumer defined + final String errorMessage = "send of event failed, event response consumer is not defined\n" + eventObject; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#stop() + */ + @Override + public void stop() { + // For event requestor, all the implementation is in the consumer + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/package-info.java new file mode 100644 index 000000000..3b6da08a4 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/package-info.java @@ -0,0 +1,26 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Implements the Event Requestor carrier technology for multiple event input from an output event. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl.eventrequestor; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FILECarrierTechnologyParameters.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FILECarrierTechnologyParameters.java new file mode 100644 index 000000000..f7f25cb9e --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FILECarrierTechnologyParameters.java @@ -0,0 +1,208 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin; + +import org.onap.policy.apex.model.utilities.ResourceUtils; +import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.ApexFileEventConsumer; +import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.producer.ApexFileEventProducer; +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; + +/** + * This class holds the parameters that allows transport of events into and out of Apex using files + * and standard input and output. + * + * <p> + * The following parameters are defined: + * <ol> + * <li>fileName: The full path to the file from which to read events or to which to write events. + * <li>standardIO: If this flag is set to true, then standard input is used to read events in or + * standard output is used to write events and the fileName parameter is ignored if present + * <li>standardError: If this flag is set to true, then standard error is used to write events + * <li>streamingMode: If this flag is set to true, then streaming mode is set for reading events and + * event handling will wait on the input stream for events until the stream is closed. If streaming + * model is off, then event reading completes when the end of input is detected. + * <li>startDelay: The amount of milliseconds to wait at startup startup before processing the first + * event. + * </ol> + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class FILECarrierTechnologyParameters extends CarrierTechnologyParameters { + // @formatter:off + /** The label of this carrier technology. */ + public static final String FILE_CARRIER_TECHNOLOGY_LABEL = "FILE"; + + /** The producer plugin class for the FILE carrier technology. */ + public static final String FILE_EVENT_PRODUCER_PLUGIN_CLASS = ApexFileEventProducer.class.getCanonicalName(); + + /** The consumer plugin class for the FILE carrier technology. */ + public static final String FILE_EVENT_CONSUMER_PLUGIN_CLASS = ApexFileEventConsumer.class.getCanonicalName(); + + private String fileName; + private boolean standardIO = false; + private boolean standardError = false; + private boolean streamingMode = false; + private long startDelay = 0; + // @formatter:on + + /** + * Constructor to create a file carrier technology parameters instance and register the instance + * with the parameter service. + */ + public FILECarrierTechnologyParameters() { + super(FILECarrierTechnologyParameters.class.getCanonicalName()); + + // Set the carrier technology properties for the FILE carrier technology + this.setLabel(FILE_CARRIER_TECHNOLOGY_LABEL); + this.setEventProducerPluginClass(FILE_EVENT_PRODUCER_PLUGIN_CLASS); + this.setEventConsumerPluginClass(FILE_EVENT_CONSUMER_PLUGIN_CLASS); + } + + /** + * Gets the file name from which to read or to which to write events. + * + * @return the file name from which to read or to which to write events + */ + public String getFileName() { + return ResourceUtils.getFilePath4Resource(fileName); + } + + /** + * Checks if is standard IO should be used for input or output. + * + * @return true, if standard IO should be used for input or output + */ + public boolean isStandardIO() { + return standardIO; + } + + /** + * Checks if is standard error should be used for output. + * + * @return true, if standard error should be used for output + */ + public boolean isStandardError() { + return standardError; + } + + /** + * Checks if is streaming mode is on. + * + * @return true, if streaming mode is on + */ + public boolean isStreamingMode() { + return streamingMode; + } + + /** + * Sets the file name from which to read or to which to write events. + * + * @param fileName the file name from which to read or to which to write events + */ + public void setFileName(final String fileName) { + this.fileName = fileName; + } + + /** + * Sets if standard IO should be used for event input or output. + * + * @param standardIO if standard IO should be used for event input or output + */ + public void setStandardIO(final boolean standardIO) { + this.standardIO = standardIO; + } + + /** + * Sets if standard error should be used for event output. + * + * @param standardError if standard error should be used for event output + */ + public void setStandardError(final boolean standardError) { + this.standardError = standardError; + } + + /** + * Sets streaming mode. + * + * @param streamingMode the streaming mode value + */ + public void setStreamingMode(final boolean streamingMode) { + this.streamingMode = streamingMode; + } + + /** + * Gets the delay in milliseconds before the plugin starts processing + * + * @return the delay + */ + public long getStartDelay() { + return startDelay; + } + + /** + * Sets the delay in milliseconds before the plugin starts processing + * + * @param startDelay the delay + */ + public void setStartDelay(final long startDelay) { + this.startDelay = startDelay; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters# + * toString() + */ + @Override + public String toString() { + return "FILECarrierTechnologyParameters [fileName=" + fileName + ", standardIO=" + standardIO + + ", standardError=" + standardError + ", streamingMode=" + streamingMode + ", startDelay=" + startDelay + + "]"; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() + */ + @Override + public String validate() { + final StringBuilder errorMessageBuilder = new StringBuilder(); + + errorMessageBuilder.append(super.validate()); + + if (!standardIO && !standardError && (fileName == null || fileName.trim().length() == 0)) { + errorMessageBuilder.append( + " fileName not specified or is blank or null, it must be specified as a valid file location\n"); + } + + if (standardIO || standardError) { + streamingMode = true; + } + + if (startDelay < 0) { + errorMessageBuilder.append(" startDelay must be zero or a positive number of milliseconds\n"); + } + + return errorMessageBuilder.toString(); + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java new file mode 100644 index 000000000..7521c3a08 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java @@ -0,0 +1,247 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.service.engine.event.ApexEventConsumer; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventReceiver; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FILECarrierTechnologyParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Concrete implementation an Apex event consumer that reads events from a file. This consumer also + * implements ApexEventProducer and therefore can be used as a synchronous consumer. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexFileEventConsumer implements ApexEventConsumer, Runnable { + + // Get a reference to the logger + private static final Logger LOGGER = LoggerFactory.getLogger(ApexFileEventConsumer.class); + + // The input stream to read events from + private InputStream eventInputStream; + + // The text block reader that will read text blocks from the contents of the file + private TextBlockReader textBlockReader; + + // The event receiver that will receive asynchronous events from this consumer + private ApexEventReceiver eventReceiver = null; + + // The consumer thread and stopping flag + private Thread consumerThread; + + // The name for this consumer + private String consumerName = null; + + // The specific carrier technology parameters for this consumer + private FILECarrierTechnologyParameters fileCarrierTechnologyParameters; + + // The peer references for this event handler + private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = + new EnumMap<>(EventHandlerPeeredMode.class); + + // Holds the next identifier for event execution. + private static AtomicLong nextExecutionID = new AtomicLong(0L); + + /** + * Private utility to get the next candidate value for a Execution ID. This value will always be + * unique in a single JVM + * + * @return the next candidate value for a Execution ID + */ + private static synchronized long getNextExecutionID() { + return nextExecutionID.getAndIncrement(); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.apps.uservice.consumer.ApexEventConsumer#init(org.onap.policy.apex.apps. + * uservice.consumer.ApexEventReceiver) + */ + @Override + public void init(final String name, final EventHandlerParameters consumerParameters, + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + this.eventReceiver = incomingEventReceiver; + this.consumerName = name; + + // Get and check the Apex parameters from the parameter service + if (consumerParameters == null) { + final String errorMessage = "Consumer parameters for ApexFileConsumer \"" + consumerName + "\" is null"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + + // Check and get the file Properties + if (!(consumerParameters.getCarrierTechnologyParameters() instanceof FILECarrierTechnologyParameters)) { + final String errorMessage = "specified consumer properties for ApexFileConsumer \"" + consumerName + + "\" are not applicable to a File consumer"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + fileCarrierTechnologyParameters = + (FILECarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); + + // Open the file producing events + try { + if (fileCarrierTechnologyParameters.isStandardIO()) { + eventInputStream = System.in; + } else { + eventInputStream = new FileInputStream(fileCarrierTechnologyParameters.getFileName()); + } + + // Get an event composer for our event source + textBlockReader = new TextBlockReaderFactory().getTaggedReader(eventInputStream, + consumerParameters.getEventProtocolParameters()); + } catch (final IOException e) { + final String errorMessage = "ApexFileConsumer \"" + consumerName + "\" failed to open file for reading: \"" + + fileCarrierTechnologyParameters.getFileName() + "\""; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + if (fileCarrierTechnologyParameters.getStartDelay() > 0) { + ThreadUtilities.sleep(fileCarrierTechnologyParameters.getStartDelay()); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName() + */ + @Override + public String getName() { + return consumerName; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode, + * org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start() + */ + @Override + public void start() { + // Configure and start the event reception thread + final String threadName = this.getClass().getName() + " : " + consumerName; + consumerThread = new ApplicationThreadFactory(threadName).newThread(this); + consumerThread.setDaemon(true); + consumerThread.start(); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + // Check that we have been initialized in async or sync mode + if (eventReceiver == null) { + LOGGER.warn("\"{}\" has not been initilaized for either asynchronous or synchronous event handling", + consumerName); + return; + } + + // Read the events from the file while there are still events in the file + try { + // Read all the text blocks + TextBlock textBlock; + do { + // Read the text block + textBlock = textBlockReader.readTextBlock(); + + // Process the event from the text block if there is one there + if (textBlock.getText() != null) { + eventReceiver.receiveEvent(getNextExecutionID(), textBlock.getText()); + } + } while (!textBlock.isEndOfText()); + } catch (final Exception e) { + LOGGER.warn("\"" + consumerName + "\" failed to read event from file: \"" + + fileCarrierTechnologyParameters.getFileName() + "\"", e); + } finally { + try { + eventInputStream.close(); + } catch (final IOException e) { + LOGGER.warn("ApexFileConsumer \"" + consumerName + "\" failed to close file: \"" + + fileCarrierTechnologyParameters.getFileName() + "\"", e); + } + } + + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop() + */ + @Override + public void stop() { + try { + eventInputStream.close(); + } catch (final IOException e) { + LOGGER.warn("ApexFileConsumer \"" + consumerName + "\" failed to close file for reading: \"" + + fileCarrierTechnologyParameters.getFileName() + "\"", e); + } + + if (consumerThread.isAlive() && !consumerThread.isInterrupted()) { + consumerThread.interrupt(); + } + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java new file mode 100644 index 000000000..b286f8afe --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java @@ -0,0 +1,141 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer; + +import java.io.IOException; +import java.io.InputStream; + +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextCharDelimitedParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The class CharacterDelimitedTextBlockReader reads the next block of text between two character + * tags from an input stream. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class CharacterDelimitedTextBlockReader implements TextBlockReader { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(CharacterDelimitedTextBlockReader.class); + + // The character tags + private final char startTagChar; + private final char endTagChar; + + // The input stream for text + private InputStream inputStream; + + // Flag indicating we have seen EOF on the stream + private boolean eofOnInputStream = false; + + /** + * Constructor, set the delimiters. + * + * @param startTagChar The start tag for text blocks + * @param endTagChar The end tag for text blocks + */ + public CharacterDelimitedTextBlockReader(final char startTagChar, final char endTagChar) { + this.startTagChar = startTagChar; + this.endTagChar = endTagChar; + } + + /** + * Constructor, set the delimiters from a character delimited event protocol parameter class. + * + * @param charDelimitedParameters the character delimited event protocol parameter class + */ + public CharacterDelimitedTextBlockReader(final EventProtocolTextCharDelimitedParameters charDelimitedParameters) { + this.startTagChar = charDelimitedParameters.getStartChar(); + this.endTagChar = charDelimitedParameters.getEndChar(); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader#init( + * java.io.InputStream) + */ + @Override + public void init(final InputStream incomingInputStream) { + this.inputStream = incomingInputStream; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader# + * readTextBlock() + */ + @Override + public TextBlock readTextBlock() throws IOException { + // Check if there was a previous end of a text block with a non-empty text block returned + if (eofOnInputStream) { + return new TextBlock(eofOnInputStream, null); + } + + // The initial nesting level of incoming text blocks is always zero + int nestingLevel = 0; + + // Holder for the text block + final StringBuilder textBlockBuilder = new StringBuilder(); + + // Read the next text block + while (true) { + final char nextChar = (char) inputStream.read(); + + // Check for EOF + if (nextChar == (char) -1) { + eofOnInputStream = true; + break; + } + + if (nextChar == startTagChar) { + nestingLevel++; + } else if (nestingLevel == 0 && !Character.isWhitespace(nextChar)) { + LOGGER.warn("invalid input on consumer: " + nextChar); + continue; + } + + textBlockBuilder.append(nextChar); + + // Check for end of the text block, we have come back to level 0 + if (nextChar == endTagChar) { + if (nestingLevel > 0) { + nestingLevel--; + } + + if (nestingLevel == 0) { + break; + } + } + } + + // Condition the text block and return it + final String textBlock = textBlockBuilder.toString().trim(); + if (textBlock.length() > 0) { + return new TextBlock(eofOnInputStream, textBlock); + } else { + return new TextBlock(eofOnInputStream, null); + } + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java new file mode 100644 index 000000000..e40bc756c --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java @@ -0,0 +1,167 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextTokenDelimitedParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class TextBlockReader reads the next block of text from an input stream. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class HeaderDelimitedTextBlockReader implements TextBlockReader, Runnable { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(HeaderDelimitedTextBlockReader.class); + + // The amount of time to wait for input on the text block reader + private static final long TEXT_BLOCK_DELAY = 250; + + // Tag for the start of a text block + private final String blockStartToken; + + // The input stream for text + private InputStream inputStream; + + // The lines of input read from the input stream + private final Queue<String> textLineQueue = new LinkedBlockingQueue<>(); + + // The thread used to read text from the input stream + private Thread textConsumputionThread; + + // True while EOF has not been seen on input + private boolean eofOnInputStream = false; + + /** + * Constructor, initialize the text block reader. + * + * @param blockStartToken the block start token for the start of a text block + */ + public HeaderDelimitedTextBlockReader(final String blockStartToken) { + this.blockStartToken = blockStartToken; + } + + /** + * Constructor, initialize the text block reader using token delimited event protocol + * parameters. + * + * @param tokenDelimitedParameters the token delimited event protocol parameters + */ + public HeaderDelimitedTextBlockReader(final EventProtocolTextTokenDelimitedParameters tokenDelimitedParameters) { + this.blockStartToken = tokenDelimitedParameters.getDelimiterToken(); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader# + * init( java.io.InputStream) + */ + @Override + public void init(final InputStream incomingInputStream) { + this.inputStream = incomingInputStream; + + // Configure and start the text reading thread + textConsumputionThread = new ApplicationThreadFactory(this.getClass().getName()).newThread(this); + textConsumputionThread.setDaemon(true); + textConsumputionThread.start(); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader# + * readTextBlock() + */ + @Override + public TextBlock readTextBlock() throws IOException { + // Holder for the current text block + final StringBuilder textBlockBuilder = new StringBuilder(); + + // Wait for the timeout period if there is no input + if (!eofOnInputStream && textLineQueue.size() == 0) { + ThreadUtilities.sleep(TEXT_BLOCK_DELAY); + } + + // Scan the lines in the queue + while (textLineQueue.size() > 0) { + // Scroll down in the available lines looking for the start of the text block + if (textLineQueue.peek().startsWith(blockStartToken)) { + // Process the input line header + textBlockBuilder.append(textLineQueue.remove()); + textBlockBuilder.append('\n'); + break; + } else { + LOGGER.warn("invalid input on consumer: " + textLineQueue.remove()); + } + } + + // Get the rest of the text document + while (textLineQueue.size() > 0 && !textLineQueue.peek().startsWith(blockStartToken)) { + textBlockBuilder.append(textLineQueue.remove()); + textBlockBuilder.append('\n'); + } + + // Condition the text block and return it + final String textBlock = textBlockBuilder.toString().trim(); + final boolean endOfText = (eofOnInputStream && textLineQueue.size() == 0 ? true : false); + + if (textBlock.length() > 0) { + return new TextBlock(endOfText, textBlock); + } else { + return new TextBlock(endOfText, null); + } + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + final BufferedReader textReader = new BufferedReader(new InputStreamReader(inputStream)); + + try { + // Read the input line by line until we see end of file on the stream + String line; + while ((line = textReader.readLine()) != null) { + textLineQueue.add(line); + } + } catch (final IOException e) { + LOGGER.warn("I/O exception on text input on consumer: ", e); + } finally { + eofOnInputStream = true; + } + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlock.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlock.java new file mode 100644 index 000000000..526d9c318 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlock.java @@ -0,0 +1,78 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer; + +/** + * This class is a bean that holds a block of text read from an incoming text file. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class TextBlock { + private boolean endOfText = false; + private String text; + + /** + * Constructor to initiate the text block. + * + * @param endOfText the end of text + * @param text the text + */ + public TextBlock(final boolean endOfText, final String text) { + this.endOfText = endOfText; + this.text = text; + } + + /** + * Checks if is end of text. + * + * @return true, if checks if is end of text + */ + public boolean isEndOfText() { + return endOfText; + } + + /** + * Sets whether end of text has been reached. + * + * @param endOfText the end of text flag value + */ + public void setEndOfText(final boolean endOfText) { + this.endOfText = endOfText; + } + + /** + * Gets the text of the text block. + * + * @return the text of the text block + */ + public String getText() { + return text; + } + + /** + * Sets the text of the text block. + * + * @param text the text of the text block + */ + public void setText(final String text) { + this.text = text; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReader.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReader.java new file mode 100644 index 000000000..627718402 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReader.java @@ -0,0 +1,46 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Implementers of the interface TextBlockReader read the next block of text from an input stream. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public interface TextBlockReader { + /** + * Initialize the text block reader reader. + * + * @param inputStream The stream to read from + */ + void init(InputStream inputStream); + + /** + * Read a block of text between two delimiters. + * + * @return The text block + * @throws IOException On reading errors + */ + TextBlock readTextBlock() throws IOException; +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReaderFactory.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReaderFactory.java new file mode 100644 index 000000000..e48266634 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReaderFactory.java @@ -0,0 +1,80 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer; + +import java.io.InputStream; + +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters; +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextCharDelimitedParameters; +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextTokenDelimitedParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This factory creates text block readers for breaking character streams into blocks of text. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class TextBlockReaderFactory { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(TextBlockReaderFactory.class); + + /** + * Get a text block reader for the given event protocol. + * + * @param inputStream the input stream that will be used for reading + * @param eventProtocolParameters the parameters that have been specified for event protocols + * @return the tagged reader + * @throws ApexEventException On an unsupported event protocol + */ + public TextBlockReader getTaggedReader(final InputStream inputStream, + final EventProtocolParameters eventProtocolParameters) throws ApexEventException { + // Check the type of event protocol we have + if (eventProtocolParameters instanceof EventProtocolTextCharDelimitedParameters) { + // We have character delimited textual input + final EventProtocolTextCharDelimitedParameters charDelimitedParameters = + (EventProtocolTextCharDelimitedParameters) eventProtocolParameters; + + // Create the text block reader + final TextBlockReader characterDelimitedTextBlockReader = + new CharacterDelimitedTextBlockReader(charDelimitedParameters); + characterDelimitedTextBlockReader.init(inputStream); + return characterDelimitedTextBlockReader; + } else if (eventProtocolParameters instanceof EventProtocolTextTokenDelimitedParameters) { + // We have token delimited textual input + final EventProtocolTextTokenDelimitedParameters tokenDelimitedParameters = + (EventProtocolTextTokenDelimitedParameters) eventProtocolParameters; + + // Create the text block reader + final HeaderDelimitedTextBlockReader headerDelimitedTextBlockReader = + new HeaderDelimitedTextBlockReader(tokenDelimitedParameters); + headerDelimitedTextBlockReader.init(inputStream); + return headerDelimitedTextBlockReader; + } else { + final String errorMessage = + "could not create text block reader for a textual event protocol, the required type " + + eventProtocolParameters.getLabel() + " is not supported"; + LOGGER.error(errorMessage); + throw new ApexEventException(errorMessage); + } + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/package-info.java new file mode 100644 index 000000000..05833ac7c --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Implements the FILE carrier technology consumer that sends events to APEX from files, standard IO + * or named pipes. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/package-info.java new file mode 100644 index 000000000..de0b1b56e --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Implements the FILE carrier technology for event input and output to and from Apex using files, + * named pipes, and standard IO. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java new file mode 100644 index 000000000..d5f9ff1b2 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java @@ -0,0 +1,181 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.producer; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.EnumMap; +import java.util.Map; + +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.SynchronousEventCache; +import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FILECarrierTechnologyParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Concrete implementation of an Apex event producer that sends events to a file. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexFileEventProducer implements ApexEventProducer { + // Get a reference to the logger + private static final Logger LOGGER = LoggerFactory.getLogger(ApexFileEventProducer.class); + + // The name for this producer + private String producerName = null; + + // The output stream to write events to + private PrintStream eventOutputStream; + + // The peer references for this event handler + private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = + new EnumMap<>(EventHandlerPeeredMode.class); + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#init() + */ + @Override + public void init(final String name, final EventHandlerParameters producerParameters) throws ApexEventException { + producerName = name; + + // Get and check the Apex parameters from the parameter service + if (producerParameters == null) { + final String errorMessage = "Producer parameters for ApexFileProducer \"" + producerName + "\" is null"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + + // Check and get the file Properties + if (!(producerParameters.getCarrierTechnologyParameters() instanceof FILECarrierTechnologyParameters)) { + final String errorMessage = "specified producer properties for ApexFileProducer \"" + producerName + + "\" are not applicable to a FILE producer"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + final FILECarrierTechnologyParameters fileCarrierTechnologyParameters = + (FILECarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); + + // Now we create a writer for events + try { + if (fileCarrierTechnologyParameters.isStandardError()) { + eventOutputStream = System.err; + } else if (fileCarrierTechnologyParameters.isStandardIO()) { + eventOutputStream = System.out; + } else { + eventOutputStream = + new PrintStream(new FileOutputStream(fileCarrierTechnologyParameters.getFileName()), true); + } + } catch (final IOException e) { + final String errorMessage = "ApexFileProducer \"" + producerName + "\" failed to open file for writing: \"" + + fileCarrierTechnologyParameters.getFileName() + "\""; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + if (fileCarrierTechnologyParameters.getStartDelay() > 0) { + ThreadUtilities.sleep(fileCarrierTechnologyParameters.getStartDelay()); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName() + */ + @Override + public String getName() { + return producerName; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode, + * org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, + * java.lang.String, java.lang.Object) + */ + @Override + public void sendEvent(final long executionId, final String eventName, final Object event) { + // Check if this is a synchronized event, if so we have received a reply + final SynchronousEventCache synchronousEventCache = + (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); + if (synchronousEventCache != null) { + synchronousEventCache.removeCachedEventToApexIfExists(executionId); + } + + // Cast the event to a string, if our conversion is correctly configured, this cast should + // always work + String stringEvent = null; + try { + stringEvent = (String) event; + } catch (final Exception e) { + final String errorMessage = "error in ApexFileProducer \"" + producerName + "\" while transferring event \"" + + event + "\" to the output stream"; + LOGGER.debug(errorMessage, e); + throw new ApexEventRuntimeException(errorMessage, e); + } + + eventOutputStream.println(stringEvent); + eventOutputStream.flush(); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop() + */ + @Override + public void stop() { + eventOutputStream.close(); + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/package-info.java new file mode 100644 index 000000000..f7d7cbfbd --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Implements the FILE carrier technology producer that outputs events from APEX to files, standard + * IO or named pipes. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.producer; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JSONEventConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JSONEventConverter.java new file mode 100644 index 000000000..3b21a29ca --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JSONEventConverter.java @@ -0,0 +1,433 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.jsonprotocolplugin; + +import java.util.ArrayList; +import java.util.List; + +import org.onap.policy.apex.context.SchemaHelper; +import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory; +import org.onap.policy.apex.model.basicmodel.service.ModelService; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvents; +import org.onap.policy.apex.model.eventmodel.concepts.AxField; +import org.onap.policy.apex.service.engine.event.ApexEvent; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +/** + * The Class Apex2JSONEventConverter converts {@link ApexEvent} instances to and from JSON string + * representations of Apex events. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class Apex2JSONEventConverter implements ApexEventProtocolConverter { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(Apex2JSONEventConverter.class); + + // The parameters for the JSON event protocol + private JSONEventProtocolParameters jsonPars; + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter#init(org.onap.policy. + * apex.service.parameters.eventprotocol.EventProtocolParameters) + */ + @Override + public void init(final EventProtocolParameters parameters) { + // Check and get the JSON parameters + if (!(parameters instanceof JSONEventProtocolParameters)) { + final String errorMessage = "specified consumer properties are not applicable to the JSON event protocol"; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + + jsonPars = (JSONEventProtocolParameters) parameters; + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventConverter#toApexEvent(java.lang.String, + * java.lang.Object) + */ + @Override + public List<ApexEvent> toApexEvent(final String eventName, final Object eventObject) throws ApexEventException { + // Check the event eventObject + if (eventObject == null) { + LOGGER.warn("event processing failed, event is null"); + throw new ApexEventException("event processing failed, event is null"); + } + + // Cast the event to a string, if our conversion is correctly configured, this cast should + // always work + String jsonEventString = null; + try { + jsonEventString = (String) eventObject; + } catch (final Exception e) { + final String errorMessage = "error converting event \"" + eventObject + "\" to a string"; + LOGGER.debug(errorMessage, e); + throw new ApexEventRuntimeException(errorMessage, e); + } + + // The list of events we will return + final List<ApexEvent> eventList = new ArrayList<ApexEvent>(); + + try { + // We may have a single JSON object with a single event or an array of JSON objects + final Object decodedJsonObject = + new GsonBuilder().serializeNulls().create().fromJson(jsonEventString, Object.class); + + // Check if we have a list of objects + if (decodedJsonObject instanceof List) { + // Check if it's a list of JSON objects or a list of strings + @SuppressWarnings("unchecked") + final List<Object> decodedJsonList = (List<Object>) decodedJsonObject; + + // Decode each of the list elements in sequence + for (final Object jsonListObject : decodedJsonList) { + if (jsonListObject instanceof String) { + eventList.add(jsonStringApexEvent(eventName, (String) jsonListObject)); + } else if (jsonListObject instanceof JsonObject) { + eventList.add(jsonObject2ApexEvent(eventName, (JsonObject) jsonListObject)); + } else { + throw new ApexEventException("incoming event (" + jsonEventString + + ") is a JSON object array containing an invalid object " + jsonListObject); + } + } + } else { + eventList.add(jsonStringApexEvent(eventName, jsonEventString)); + } + } catch (final Exception e) { + final String errorString = + "Failed to unmarshal JSON event: " + e.getMessage() + ", event=" + jsonEventString; + LOGGER.warn(errorString, e); + throw new ApexEventException(errorString, e); + } + + // Return the list of events we have unmarshalled + return eventList; + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventConverter#fromApexEvent(org.onap.policy. + * apex.service.engine.event.ApexEvent) + */ + @Override + public Object fromApexEvent(final ApexEvent apexEvent) throws ApexEventException { + // Check the Apex event + if (apexEvent == null) { + LOGGER.warn("event processing failed, Apex event is null"); + throw new ApexEventException("event processing failed, Apex event is null"); + } + + // Get the event definition for the event from the model service + final AxEvent eventDefinition = + ModelService.getModel(AxEvents.class).get(apexEvent.getName(), apexEvent.getVersion()); + + // Use a GSON Json object to marshal the Apex event to JSON + final Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create(); + final JsonObject jsonObject = new JsonObject(); + + jsonObject.addProperty(ApexEvent.NAME_HEADER_FIELD, apexEvent.getName()); + jsonObject.addProperty(ApexEvent.VERSION_HEADER_FIELD, apexEvent.getVersion()); + jsonObject.addProperty(ApexEvent.NAMESPACE_HEADER_FIELD, apexEvent.getNameSpace()); + jsonObject.addProperty(ApexEvent.SOURCE_HEADER_FIELD, apexEvent.getSource()); + jsonObject.addProperty(ApexEvent.TARGET_HEADER_FIELD, apexEvent.getTarget()); + + if (apexEvent.getExceptionMessage() != null) { + jsonObject.addProperty(ApexEvent.EXCEPTION_MESSAGE_HEADER_FIELD, apexEvent.getExceptionMessage()); + } + + for (final AxField eventField : eventDefinition.getFields()) { + final String fieldName = eventField.getKey().getLocalName(); + + if (!apexEvent.containsKey(fieldName)) { + if (!eventField.getOptional()) { + final String errorMessage = "error parsing " + eventDefinition.getID() + " event to Json. " + + "Field \"" + fieldName + "\" is missing, but is mandatory. Fields: " + apexEvent; + LOGGER.debug(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + continue; + } + + final Object fieldValue = apexEvent.get(fieldName); + + // Get the schema helper + final SchemaHelper fieldSchemaHelper = + new SchemaHelperFactory().createSchemaHelper(eventField.getKey(), eventField.getSchema()); + jsonObject.add(fieldName, fieldSchemaHelper.marshal2JsonElement(fieldValue)); + } + + // Output JSON string in a pretty format + return gson.toJson(jsonObject); + } + + /** + * This method converts a JSON object into an Apex event. + * + * @param eventName the name of the event + * @param jsonEventString the JSON string that holds the event + * @return the apex event that we have converted the JSON object into + * @throws ApexEventException thrown on unmarshaling exceptions + */ + private ApexEvent jsonStringApexEvent(final String eventName, final String jsonEventString) + throws ApexEventException { + // Use GSON to read the event string + final JsonObject jsonObject = + new GsonBuilder().serializeNulls().create().fromJson(jsonEventString, JsonObject.class); + + if (jsonObject == null || !jsonObject.isJsonObject()) { + throw new ApexEventException( + "incoming event (" + jsonEventString + ") is not a JSON object or an JSON object array"); + } + + return jsonObject2ApexEvent(eventName, jsonObject); + } + + /** + * This method converts a JSON object into an Apex event. + * + * @param eventName the name of the event + * @param jsonObject the JSON object that holds the event + * @return the apex event that we have converted the JSON object into + * @throws ApexEventException thrown on unmarshaling exceptions + */ + private ApexEvent jsonObject2ApexEvent(final String eventName, final JsonObject jsonObject) + throws ApexEventException { + // Process the mandatory Apex header + final ApexEvent apexEvent = processApexEventHeader(eventName, jsonObject); + + // Get the event definition for the event from the model service + final AxEvent eventDefinition = + ModelService.getModel(AxEvents.class).get(apexEvent.getName(), apexEvent.getVersion()); + + // Iterate over the input fields in the event + for (final AxField eventField : eventDefinition.getFields()) { + final String fieldName = eventField.getKey().getLocalName(); + if (!hasJSONField(jsonObject, fieldName)) { + if (!eventField.getOptional()) { + final String errorMessage = "error parsing " + eventDefinition.getID() + " event from Json. " + + "Field \"" + fieldName + "\" is missing, but is mandatory."; + LOGGER.debug(errorMessage); + throw new ApexEventException(errorMessage); + } + continue; + } + + final JsonElement fieldValue = getJSONField(jsonObject, fieldName, null, !eventField.getOptional()); + + if (fieldValue != null && !fieldValue.isJsonNull()) { + // Get the schema helper + final SchemaHelper fieldSchemaHelper = + new SchemaHelperFactory().createSchemaHelper(eventField.getKey(), eventField.getSchema()); + apexEvent.put(fieldName, fieldSchemaHelper.createNewInstance(fieldValue)); + } else { + apexEvent.put(fieldName, null); + } + } + return apexEvent; + + } + + /** + * This method processes the event header of an Apex event. + * + * @param eventName the name of the event + * @param jsonObject the JSON object containing the JSON representation of the incoming event + * @return an apex event constructed using the header fields of the event + * @throws ApexEventRuntimeException the apex event runtime exception + * @throws ApexEventException on invalid events with missing header fields + */ + private ApexEvent processApexEventHeader(final String eventName, final JsonObject jsonObject) + throws ApexEventRuntimeException, ApexEventException { + // Get the event header fields + // @formatter:off + String name = getJSONStringField(jsonObject, ApexEvent.NAME_HEADER_FIELD, jsonPars.getNameAlias(), ApexEvent.NAME_REGEXP, false); + String version = getJSONStringField(jsonObject, ApexEvent.VERSION_HEADER_FIELD, jsonPars.getVersionAlias(), ApexEvent.VERSION_REGEXP, false); + String namespace = getJSONStringField(jsonObject, ApexEvent.NAMESPACE_HEADER_FIELD, jsonPars.getNameSpaceAlias(), ApexEvent.NAMESPACE_REGEXP, false); + String source = getJSONStringField(jsonObject, ApexEvent.SOURCE_HEADER_FIELD, jsonPars.getSourceAlias(), ApexEvent.SOURCE_REGEXP, false); + String target = getJSONStringField(jsonObject, ApexEvent.TARGET_HEADER_FIELD, jsonPars.getTargetAlias(), ApexEvent.TARGET_REGEXP, false); + // @formatter:on + + // Check if an event name was specified on the event parameters + if (eventName != null) { + if (name != null && !eventName.equals(name)) { + LOGGER.warn("The incoming event name \"" + name + "\" does not match the configured event name \"" + + eventName + "\", using configured event name"); + } + name = eventName; + } else { + if (name == null) { + throw new ApexEventRuntimeException( + "event received without mandatory parameter \"name\" on configuration or on event"); + } + } + + // Now, find the event definition in the model service. If version is null, the newest event + // definition in the model service is used + final AxEvent eventDefinition = ModelService.getModel(AxEvents.class).get(name, version); + if (eventDefinition == null) { + if (version == null) { + throw new ApexEventRuntimeException( + "an event definition for an event named \"" + name + "\" not found in Apex model"); + } else { + throw new ApexEventRuntimeException("an event definition for an event named \"" + name + + "\" with version \"" + version + "\" not found in Apex model"); + } + } + + // Use the defined event version if no version is specified on the incoming fields + if (version == null) { + version = eventDefinition.getKey().getVersion(); + } + + // Check the name space is OK if it is defined, if not, use the name space from the model + if (namespace != null) { + if (!namespace.equals(eventDefinition.getNameSpace())) { + throw new ApexEventRuntimeException( + "namespace \"" + namespace + "\" on event \"" + name + "\" does not match namespace \"" + + eventDefinition.getNameSpace() + "\" for that event in the Apex model"); + } + } else { + namespace = eventDefinition.getNameSpace(); + } + + // For source, use the defined source only if the source is not found on the incoming event + if (source == null) { + source = eventDefinition.getSource(); + } + + // For target, use the defined source only if the source is not found on the incoming event + if (target == null) { + target = eventDefinition.getTarget(); + } + + return new ApexEvent(name, version, namespace, source, target); + } + + /** + * This method gets an event string field from a JSON object. + * + * @param jsonObject the JSON object containing the JSON representation of the incoming event + * @param fieldName the field name to find in the event + * @param fieldAlias the alias for the field to find in the event, overrides the field name if + * it is not null + * @param fieldRE the regular expression to check the field against for validity + * @param mandatory true if the field is mandatory + * @return the value of the field in the JSON object or null if the field is optional + * @throws ApexEventRuntimeException the apex event runtime exception + */ + private String getJSONStringField(final JsonObject jsonObject, final String fieldName, final String fieldAlias, + final String fieldRE, final boolean mandatory) throws ApexEventRuntimeException { + // Get the JSON field for the string field + final JsonElement jsonField = getJSONField(jsonObject, fieldName, fieldAlias, mandatory); + + // Null strings are allowed + if (jsonField == null || jsonField.isJsonNull()) { + return null; + } + + // Check if this is a string field + String fieldValueString = null; + try { + fieldValueString = jsonField.getAsString(); + } catch (final Exception e) { + // The element is not a string so throw an error + throw new ApexEventRuntimeException("field \"" + fieldName + "\" with type \"" + + jsonField.getClass().getCanonicalName() + "\" is not a string value"); + } + + // Is regular expression checking required + if (fieldRE == null) { + return fieldValueString; + } + + // Check the event field against its regular expression + if (!fieldValueString.matches(fieldRE)) { + throw new ApexEventRuntimeException( + "field \"" + fieldName + "\" with value \"" + fieldValueString + "\" is invalid"); + } + + return fieldValueString; + } + + /** + * This method gets an event field from a JSON object. + * + * @param jsonObject the JSON object containing the JSON representation of the incoming event + * @param fieldName the field name to find in the event + * @param fieldAlias the alias for the field to find in the event, overrides the field name if + * it is not null + * @param mandatory true if the field is mandatory + * @return the value of the field in the JSON object or null if the field is optional + * @throws ApexEventRuntimeException the apex event runtime exception + */ + private JsonElement getJSONField(final JsonObject jsonObject, final String fieldName, final String fieldAlias, + final boolean mandatory) throws ApexEventRuntimeException { + + // Check if we should use the alias for this field + String fieldToFind = fieldName; + if (fieldAlias != null) { + fieldToFind = fieldAlias; + } + + // Get the event field + final JsonElement eventElement = jsonObject.get(fieldToFind); + if (eventElement == null) { + if (!mandatory) { + return null; + } else { + throw new ApexEventRuntimeException("mandatory field \"" + fieldToFind + "\" is missing"); + } + } + + return eventElement; + } + + /** + * This method if a JSON object has a named field. + * + * @param jsonObject the JSON object containing the JSON representation of the incoming event + * @param fieldName the field name to find in the event + * @return true if the field is present + * @throws ApexEventRuntimeException the apex event runtime exception + */ + private boolean hasJSONField(final JsonObject jsonObject, final String fieldName) throws ApexEventRuntimeException { + // check for the field + return jsonObject.has(fieldName); + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JSONEventProtocolParameters.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JSONEventProtocolParameters.java new file mode 100644 index 000000000..b4a4055d7 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JSONEventProtocolParameters.java @@ -0,0 +1,135 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.jsonprotocolplugin; + +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextCharDelimitedParameters; + +/** + * Event protocol parameters for JSON as an event protocol. + * + * The parameters for this plugin are: + * <ol> + * <li>nameAlias: The field in a JSON event to use as an alias for the event name. This parameter is + * optional. + * <li>versionAlias: The field in a JSON event to use as an alias for the event version. This + * parameter is optional. + * <li>nameSpaceAlias: The field in a JSON event to use as an alias for the event name space. This + * parameter is optional. + * <li>sourceAlias: The field in a JSON event to use as an alias for the event source. This + * parameter is optional. + * <li>targetAlias: The field in a JSON event to use as an alias for the event target. This + * parameter is optional. + * </ol> + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class JSONEventProtocolParameters extends EventProtocolTextCharDelimitedParameters { + /** The label of this event protocol. */ + public static final String JSON_EVENT_PROTOCOL_LABEL = "JSON"; + + // Constants for text block delimiters + private static final char JSON_TEXT_BLOCK_START_DELIMITER = '{'; + private static final char JSON_TEXT_BLOCK_END_DELIMITER = '}'; + + // Aliases for Apex event header fields + // @formatter:off + private final String nameAlias = null; + private final String versionAlias = null; + private final String nameSpaceAlias = null; + private final String sourceAlias = null; + private final String targetAlias = null; + // @formatter:on + + /** + * Constructor to create a JSON event protocol parameter instance and register the instance with + * the parameter service. + */ + public JSONEventProtocolParameters() { + this(JSONEventProtocolParameters.class.getCanonicalName(), JSON_EVENT_PROTOCOL_LABEL); + } + + /** + * Constructor to create an event protocol parameters instance with the name of a sub class of + * this class. + * + * @param parameterClassName the class name of a sub class of this class + * @param eventProtocolLabel the name of the event protocol for this plugin + */ + public JSONEventProtocolParameters(final String parameterClassName, final String eventProtocolLabel) { + super(parameterClassName); + + // Set the event protocol properties for the JSON event protocol + this.setLabel(eventProtocolLabel); + + // Set the starting and ending delimiters for text blocks of JSON events + this.setStartChar(JSON_TEXT_BLOCK_START_DELIMITER); + this.setEndChar(JSON_TEXT_BLOCK_END_DELIMITER); + + // Set the event protocol plugin class + this.setEventProtocolPluginClass(Apex2JSONEventConverter.class.getCanonicalName()); + } + + /** + * Gets the name alias. + * + * @return the name alias + */ + public String getNameAlias() { + return nameAlias; + } + + /** + * Gets the version alias. + * + * @return the version alias + */ + public String getVersionAlias() { + return versionAlias; + } + + /** + * Gets the name space alias. + * + * @return the name space alias + */ + public String getNameSpaceAlias() { + return nameSpaceAlias; + } + + /** + * Gets the source alias. + * + * @return the source alias + */ + public String getSourceAlias() { + return sourceAlias; + } + + /** + * Gets the target alias. + * + * @return the target alias + */ + public String getTargetAlias() { + return targetAlias; + } + +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/package-info.java new file mode 100644 index 000000000..65f4831ec --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Contains the implementation of the APEX event protocol cinverter plugin for events in Json + * format. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl.jsonprotocolplugin; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/package-info.java new file mode 100644 index 000000000..ae2d58701 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/package-info.java @@ -0,0 +1,30 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Contains implementations for conversion between externally facing + * {@link org.onap.policy.apex.service.engine.event.ApexEvent} instances and internal APEX engine + * {@link org.onap.policy.apex.core.engine.event.EnEvent} instances. It also contains the + * implementation of the default APEX File carrier technology plugin as well as the protocol plugins + * for XML and JSON event protocols. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/package-info.java new file mode 100644 index 000000000..75404137e --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/package-info.java @@ -0,0 +1,32 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Provides a generic externally-facing {@link ApexEvent} class that can be sent into an APEX engine + * and processed by an APEX engine. It provides the producer {@link ApexEventProducer} producer and + * {@link ApexEventConsumer} consumer interfaces that APEX uses to send events to and receive events + * from other systems. It also provides the {@link ApexEventConverter} interface that can be + * implemented by plugins that wish to convert some external event format into the APEX event + * format. It also provides a periodic event generator that can be used to send periodic events into + * an APEX engine for triggering of policies to carry out housekeeping tasks. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event; |