summaryrefslogtreecommitdiffstats
path: root/services/services-engine/src/main/java/org/onap/policy/apex/service/engine
diff options
context:
space:
mode:
Diffstat (limited to 'services/services-engine/src/main/java/org/onap/policy/apex/service/engine')
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java96
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessagingService.java7
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java42
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPeriodicEventGenerator.java8
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java41
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java5
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java5
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java19
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FileCarrierTechnologyParameters.java (renamed from services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FILECarrierTechnologyParameters.java)45
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java61
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java35
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java2
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java10
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JsonEventConverter.java (renamed from services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JSONEventConverter.java)164
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JsonEventProtocolParameters.java (renamed from services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JSONEventProtocolParameters.java)10
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexCommandLineArguments.java23
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventMarshaller.java8
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java60
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java8
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/EngineService.java2
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java166
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java74
22 files changed, 448 insertions, 443 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java
index 3840e915d..a9b862d41 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java
@@ -20,6 +20,8 @@
package org.onap.policy.apex.service.engine.engdep;
+import com.google.common.eventbus.Subscribe;
+
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -50,17 +52,14 @@ import org.onap.policy.apex.service.engine.runtime.EngineService;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
-import com.google.common.eventbus.Subscribe;
-
/**
- * The listener interface for receiving engDepMessage events. The class that is interested in
- * processing a engDepMessage event implements this interface, and the object created with that
- * class is registered with a component using the component's <code>addEngDepMessageListener</code>
- * method. When the engDepMessage event occurs, that object's appropriate method is invoked.
+ * The listener interface for receiving engDepMessage events. The class that is interested in processing a engDepMessage
+ * event implements this interface, and the object created with that class is registered with a component using the
+ * component's <code>addEngDepMessageListener</code> method. When the engDepMessage event occurs, that object's
+ * appropriate method is invoked.
*
- * <p>This class uses a queue to buffer incoming messages. When the listener is called, it places the
- * incoming message on the queue. A thread runs which removes the messages from the queue and
- * forwards them to the Apex engine.
+ * <p>This class uses a queue to buffer incoming messages. When the listener is called, it places the incoming message
+ * on the queue. A thread runs which removes the messages from the queue and forwards them to the Apex engine.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
*/
@@ -83,9 +82,8 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
private final BlockingQueue<MessageBlock<Message>> messageQueue = new LinkedBlockingDeque<>();
/**
- * Instantiates a new EngDep message listener for listening for messages coming in from the
- * Deployment client. The <code>apexService</code> is the Apex service to send the messages
- * onto.
+ * Instantiates a new EngDep message listener for listening for messages coming in from the Deployment client. The
+ * <code>apexService</code> is the Apex service to send the messages onto.
*
* @param apexService the Apex engine service
*/
@@ -94,8 +92,8 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
}
/**
- * This method is an implementation of the message listener. It receives a message and places it
- * on the queue for processing by the message listening thread.
+ * This method is an implementation of the message listener. It receives a message and places it on the queue for
+ * processing by the message listening thread.
*
* @param data the data
* @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage
@@ -106,8 +104,8 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
public void onMessage(final MessageBlock<Message> data) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("message received from client application {} port {}",
- data.getConnection().getRemoteSocketAddress().getAddress(),
- data.getConnection().getRemoteSocketAddress().getPort());
+ data.getConnection().getRemoteSocketAddress().getAddress(),
+ data.getConnection().getRemoteSocketAddress().getPort());
}
messageQueue.add(data);
}
@@ -115,8 +113,7 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.
- * String)
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang. String)
*/
@Override
public void onMessage(final String messageString) {
@@ -148,8 +145,7 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
}
/**
- * Runs the message listening thread. Here, the messages come in on the message queue and are
- * processed one by one
+ * Runs the message listening thread. Here, the messages come in on the message queue and are processed one by one
*/
@Override
public void run() {
@@ -173,8 +169,8 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
}
/**
- * This method handles EngDep messages as they come in. It uses the inevitable switch statement
- * to handle the messages.
+ * This method handles EngDep messages as they come in. It uses the inevitable switch statement to handle the
+ * messages.
*
* @param message the incoming EngDep message
* @param webSocket the web socket on which the message came in
@@ -195,7 +191,7 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
enDepAction = (EngDepAction) message.getAction();
} else {
throw new ApexException(message.getAction().getClass().getName()
- + "action on received message invalid, action must be of type \"EnDepAction\"");
+ + "action on received message invalid, action must be of type \"EnDepAction\"");
}
// Handle each incoming message using the inevitable switch statement for the EngDep
@@ -204,12 +200,12 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
case GET_ENGINE_SERVICE_INFO:
final GetEngineServiceInfo engineServiceInformationMessage = (GetEngineServiceInfo) message;
LOGGER.debug("getting engine service information for engine service " + apexService.getKey().getId()
- + " . . .");
+ + " . . .");
// Send a reply with the engine service information
sendServiceInfoReply(webSocket, engineServiceInformationMessage, apexService.getKey(),
- apexService.getEngineKeys(), apexService.getApexModelKey());
- LOGGER.debug(
- "returned engine service information for engine service " + apexService.getKey().getId());
+ apexService.getEngineKeys(), apexService.getApexModelKey());
+ LOGGER.debug("returned engine service information for engine service "
+ + apexService.getKey().getId());
break;
case UPDATE_MODEL:
@@ -217,10 +213,10 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
LOGGER.debug("updating model in engine {} . . .", updateModelMessage.getTarget().getId());
// Update the model
apexService.updateModel(updateModelMessage.getTarget(), updateModelMessage.getMessageData(),
- updateModelMessage.isForceInstall());
+ updateModelMessage.isForceInstall());
// Send a reply indicating the message action worked
sendReply(webSocket, updateModelMessage, true,
- "updated model in engine " + updateModelMessage.getTarget().getId());
+ "updated model in engine " + updateModelMessage.getTarget().getId());
LOGGER.debug("updated model in engine service {}", updateModelMessage.getTarget().getId());
break;
@@ -231,7 +227,7 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
apexService.start(startEngineMessage.getTarget());
// Send a reply indicating the message action worked
sendReply(webSocket, startEngineMessage, true,
- "started engine " + startEngineMessage.getTarget().getId());
+ "started engine " + startEngineMessage.getTarget().getId());
LOGGER.debug("started engine {}", startEngineMessage.getTarget().getId());
break;
@@ -242,33 +238,33 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
apexService.stop(stopEngineMessage.getTarget());
// Send a reply indicating the message action worked
sendReply(webSocket, stopEngineMessage, true,
- "stopped engine " + stopEngineMessage.getTarget().getId());
+ "stopped engine " + stopEngineMessage.getTarget().getId());
LOGGER.debug("stopping engine {}", stopEngineMessage.getTarget().getId());
break;
case START_PERIODIC_EVENTS:
final StartPeriodicEvents startPeriodicEventsMessage = (StartPeriodicEvents) message;
LOGGER.debug("starting periodic events on engine {} . . .",
- startPeriodicEventsMessage.getTarget().getId());
+ startPeriodicEventsMessage.getTarget().getId());
// Start periodic events with the period specified in the message
final Long period = Long.parseLong(startPeriodicEventsMessage.getMessageData());
apexService.startPeriodicEvents(period);
// Send a reply indicating the message action worked
- sendReply(webSocket, startPeriodicEventsMessage, true, "started periodic events on engine "
- + startPeriodicEventsMessage.getTarget().getId() + " with period " + period);
- LOGGER.debug("started periodic events on engine " + startPeriodicEventsMessage.getTarget().getId()
- + " with period " + period);
+ String periodicStartedMessage = "started periodic events on engine "
+ + startPeriodicEventsMessage.getTarget().getId() + " with period " + period;
+ sendReply(webSocket, startPeriodicEventsMessage, true, periodicStartedMessage);
+ LOGGER.debug(periodicStartedMessage);
break;
case STOP_PERIODIC_EVENTS:
final StopPeriodicEvents stopPeriodicEventsMessage = (StopPeriodicEvents) message;
LOGGER.debug("stopping periodic events on engine {} . . .",
- stopPeriodicEventsMessage.getTarget().getId());
+ stopPeriodicEventsMessage.getTarget().getId());
// Stop periodic events
apexService.stopPeriodicEvents();
// Send a reply indicating the message action worked
- sendReply(webSocket, stopPeriodicEventsMessage, true,
- "stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getId());
+ sendReply(webSocket, stopPeriodicEventsMessage, true, "stopped periodic events on engine "
+ + stopPeriodicEventsMessage.getTarget().getId());
LOGGER.debug("stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getId());
break;
@@ -277,7 +273,7 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
LOGGER.debug("getting status for engine{} . . .", getEngineStatusMessage.getTarget().getId());
// Send a reply with the engine status
sendReply(webSocket, getEngineStatusMessage, true,
- apexService.getStatus(getEngineStatusMessage.getTarget()));
+ apexService.getStatus(getEngineStatusMessage.getTarget()));
LOGGER.debug("returned status for engine {}", getEngineStatusMessage.getTarget().getId());
break;
@@ -313,7 +309,7 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
* @param messageData the message data
*/
private void sendReply(final WebSocket client, final Message requestMessage, final boolean result,
- final String messageData) {
+ final String messageData) {
LOGGER.entry(result, messageData);
if (client == null || !client.isOpen()) {
@@ -321,8 +317,9 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
return;
}
- LOGGER.debug("sending {} to web socket {}", requestMessage.getAction(),
- client.getRemoteSocketAddress().toString());
+ String replyString = "sending " + requestMessage.getAction() + " to web socket "
+ + client.getRemoteSocketAddress().toString();
+ LOGGER.debug(replyString);
final Response responseMessage = new Response(requestMessage.getTarget(), result, requestMessage);
responseMessage.setMessageData(messageData);
@@ -344,14 +341,15 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
* @param apexModelKey the apex model key
*/
private void sendServiceInfoReply(final WebSocket client, final Message requestMessage,
- final AxArtifactKey engineServiceKey, final Collection<AxArtifactKey> engineKeyCollection,
- final AxArtifactKey apexModelKey) {
+ final AxArtifactKey engineServiceKey, final Collection<AxArtifactKey> engineKeyCollection,
+ final AxArtifactKey apexModelKey) {
LOGGER.entry();
- LOGGER.debug("sending {} to web socket {}", requestMessage.getAction(),
- client.getRemoteSocketAddress().toString());
+ String sendingMessage = "sending " + requestMessage.getAction() + " to web socket "
+ + client.getRemoteSocketAddress().toString();
+ LOGGER.debug(sendingMessage);
- final EngineServiceInfoResponse responseMessage =
- new EngineServiceInfoResponse(requestMessage.getTarget(), true, requestMessage);
+ final EngineServiceInfoResponse responseMessage = new EngineServiceInfoResponse(requestMessage.getTarget(),
+ true, requestMessage);
responseMessage.setMessageData("engine service information");
responseMessage.setEngineServiceKey(engineServiceKey);
responseMessage.setEngineKeyArray(engineKeyCollection);
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessagingService.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessagingService.java
index 86589ac81..7ebcad830 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessagingService.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessagingService.java
@@ -22,14 +22,13 @@ package org.onap.policy.apex.service.engine.engdep;
import java.net.InetSocketAddress;
-import org.onap.policy.apex.service.engine.runtime.EngineService;
-import org.slf4j.ext.XLogger;
-import org.slf4j.ext.XLoggerFactory;
-
import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory;
import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
import org.onap.policy.apex.core.protocols.Message;
+import org.onap.policy.apex.service.engine.runtime.EngineService;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
/**
* The Class EngDepMessagingService is used to encapsulate the server side of EngDep communication.
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java
index 38762ea97..8c7911bce 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java
@@ -37,6 +37,9 @@ import org.slf4j.ext.XLoggerFactory;
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
public class ApexEvent extends HashMap<String, Object> implements Serializable {
+ // Recurring string constants
+ private static final String EVENT_PREAMBLE = "event \"";
+
private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEvent.class);
private static final long serialVersionUID = -4451918242101961685L;
@@ -111,7 +114,7 @@ public class ApexEvent extends HashMap<String, Object> implements Serializable {
// An identifier for the current event execution. The default value here will always be unique
// in a single JVM
- private long executionID = ApexEvent.getNextExecutionID();
+ private long executionId = ApexEvent.getNextExecutionId();
// A string holding a message that indicates why processing of this event threw an exception
private String exceptionMessage;
@@ -122,7 +125,7 @@ public class ApexEvent extends HashMap<String, Object> implements Serializable {
*
* @return the next candidate value for a Execution ID
*/
- private static synchronized long getNextExecutionID() {
+ private static synchronized long getNextExecutionId() {
return nextExecutionID.getAndIncrement();
}
@@ -139,11 +142,11 @@ public class ApexEvent extends HashMap<String, Object> implements Serializable {
public ApexEvent(final String name, final String version, final String nameSpace, final String source,
final String target) throws ApexEventException {
// @formatter:off
- this.name = validateField("name", name, NAME_REGEXP);
- this.version = validateField("version", version, VERSION_REGEXP);
- this.nameSpace = validateField("nameSpace", nameSpace, NAMESPACE_REGEXP);
- this.source = validateField("source", source, SOURCE_REGEXP);
- this.target = validateField("target", target, TARGET_REGEXP);
+ this.name = validateField(NAME_HEADER_FIELD, name, NAME_REGEXP);
+ this.version = validateField(VERSION_HEADER_FIELD, version, VERSION_REGEXP);
+ this.nameSpace = validateField(NAMESPACE_HEADER_FIELD, nameSpace, NAMESPACE_REGEXP);
+ this.source = validateField(SOURCE_HEADER_FIELD, source, SOURCE_REGEXP);
+ this.target = validateField(TARGET_HEADER_FIELD, target, TARGET_REGEXP);
// @formatter:on
}
@@ -161,10 +164,10 @@ public class ApexEvent extends HashMap<String, Object> implements Serializable {
if (fieldValue.matches(fieldRegexp)) {
return fieldValue;
} else {
- LOGGER.warn("event \"" + name + ": field \"" + fieldName + "=" + fieldValue
- + "\" is illegal. It doesn't match regex '" + fieldRegexp + "'");
- throw new ApexEventException(
- "event \"" + name + ": field \"" + fieldName + "=" + fieldValue + "\" is illegal");
+ String message = EVENT_PREAMBLE + name + ": field \"" + fieldName + "=" + fieldValue
+ + "\" is illegal. It doesn't match regex '" + fieldRegexp + "'";
+ LOGGER.warn(message);
+ throw new ApexEventException(message);
}
}
@@ -179,8 +182,9 @@ public class ApexEvent extends HashMap<String, Object> implements Serializable {
if (key.matches(AxReferenceKey.LOCAL_NAME_REGEXP)) {
return key;
} else {
- LOGGER.warn("event \"" + name + ": key \"" + key + "\" is illegal");
- throw new ApexEventException("event \"" + name + ": key \"" + key + "\" is illegal");
+ String message = EVENT_PREAMBLE + name + ": key \"" + key + "\" is illegal";
+ LOGGER.warn(message);
+ throw new ApexEventException(message);
}
}
@@ -234,8 +238,8 @@ public class ApexEvent extends HashMap<String, Object> implements Serializable {
*
* @return the executionID
*/
- public long getExecutionID() {
- return executionID;
+ public long getExecutionId() {
+ return executionId;
}
/**
@@ -243,10 +247,10 @@ public class ApexEvent extends HashMap<String, Object> implements Serializable {
* unique in the current JVM. For some applications/deployments this executionID may need to
* globally unique
*
- * @param executionID the executionID
+ * @param executionId the executionID
*/
- public void setExecutionID(final long executionID) {
- this.executionID = executionID;
+ public void setExecutionId(final long executionId) {
+ this.executionId = executionId;
}
/**
@@ -330,7 +334,7 @@ public class ApexEvent extends HashMap<String, Object> implements Serializable {
builder.append(",target=");
builder.append(target);
builder.append(",executionID=");
- builder.append(executionID);
+ builder.append(executionId);
builder.append(",exceptionMessage=");
builder.append(exceptionMessage);
builder.append(",");
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPeriodicEventGenerator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPeriodicEventGenerator.java
index b34d5185c..32f87a7cc 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPeriodicEventGenerator.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPeriodicEventGenerator.java
@@ -86,7 +86,7 @@ public class ApexPeriodicEventGenerator extends TimerTask {
private final EngineServiceEventInterface engineServiceEventInterface;
// Timing information
- private long period = 0;
+ private long eventGeneratorPeriod = 0;
private long firstEventTime = 0;
private long lastEventTime = 0;
private long eventCount = 0;
@@ -102,7 +102,7 @@ public class ApexPeriodicEventGenerator extends TimerTask {
final long period) {
// Save the engine service reference and delay
this.engineServiceEventInterface = engineServiceEventInterface;
- this.period = period;
+ this.eventGeneratorPeriod = period;
timer = new Timer(ApexPeriodicEventGenerator.class.getSimpleName(), true);
timer.schedule(this, period, period);
@@ -128,7 +128,7 @@ public class ApexPeriodicEventGenerator extends TimerTask {
eventCount++;
// Set the fields in the periodic event
- periodicEventMap.put(PERIODIC_DELAY, period);
+ periodicEventMap.put(PERIODIC_DELAY, eventGeneratorPeriod);
periodicEventMap.put(PERIODIC_FIRST_TIME, firstEventTime);
periodicEventMap.put(PERIODIC_LAST_TIME, lastEventTime);
periodicEventMap.put(PERIODIC_CURRENT_TIME, currentEventTime);
@@ -170,7 +170,7 @@ public class ApexPeriodicEventGenerator extends TimerTask {
*/
@Override
public String toString() {
- return "ApexPeriodicEventGenerator [period=" + period + ", firstEventTime=" + firstEventTime
+ return "ApexPeriodicEventGenerator [period=" + eventGeneratorPeriod + ", firstEventTime=" + firstEventTime
+ ", lastEventTime=" + lastEventTime + ", eventCount=" + eventCount + "]";
}
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java
index 1830fc0e5..5e48a5894 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java
@@ -33,9 +33,8 @@ import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
/**
- * This class holds a cache of the synchronous events sent into Apex and that have not yet been
- * replied to. It runs a thread to time out events that have not been replied to in the specified
- * timeout.
+ * This class holds a cache of the synchronous events sent into Apex and that have not yet been replied to. It runs a
+ * thread to time out events that have not been replied to in the specified timeout.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
@@ -55,11 +54,10 @@ public class SynchronousEventCache extends PeeredReference implements Runnable {
private long synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
// Map holding outstanding synchronous events
- private final Map<Long, SimpleEntry<Long, Object>> toApexEventMap = new HashMap<Long, SimpleEntry<Long, Object>>();
+ private final Map<Long, SimpleEntry<Long, Object>> toApexEventMap = new HashMap<>();
// Map holding reply events
- private final Map<Long, SimpleEntry<Long, Object>> fromApexEventMap =
- new HashMap<Long, SimpleEntry<Long, Object>>();
+ private final Map<Long, SimpleEntry<Long, Object>> fromApexEventMap = new HashMap<>();
// The message listener thread and stopping flag
private final Thread synchronousEventCacheThread;
@@ -71,11 +69,10 @@ public class SynchronousEventCache extends PeeredReference implements Runnable {
* @param peeredMode the peered mode for which to return the reference
* @param consumer the consumer that is populating the cache
* @param producer the producer that is emptying the cache
- * @param synchronousEventTimeout the time in milliseconds to wait for the reply to a sent
- * synchronous event
+ * @param synchronousEventTimeout the time in milliseconds to wait for the reply to a sent synchronous event
*/
public SynchronousEventCache(final EventHandlerPeeredMode peeredMode, final ApexEventConsumer consumer,
- final ApexEventProducer producer, final long synchronousEventTimeout) {
+ final ApexEventProducer producer, final long synchronousEventTimeout) {
super(peeredMode, consumer, producer);
if (synchronousEventTimeout != 0) {
@@ -211,7 +208,8 @@ public class SynchronousEventCache extends PeeredReference implements Runnable {
// Check if there are any unprocessed events
if (!toApexEventMap.isEmpty()) {
- LOGGER.warn(toApexEventMap.size() + " synchronous events dropped due to system shutdown");
+ String message = toApexEventMap.size() + " synchronous events dropped due to system shutdown";
+ LOGGER.warn(message);
}
toApexEventMap.clear();
@@ -226,7 +224,7 @@ public class SynchronousEventCache extends PeeredReference implements Runnable {
* @param event the event to cache
*/
private void cacheSynchronizedEvent(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap,
- final long executionId, final Object event) {
+ final long executionId, final Object event) {
LOGGER.entry("Adding event with execution ID: " + executionId);
// Check if the event is already in the cache
@@ -234,7 +232,8 @@ public class SynchronousEventCache extends PeeredReference implements Runnable {
// If there was no sent event then the event timed out or some unexpected event was
// received
final String errorMessage = "an event with ID " + executionId
- + " already exists in the synchronous event cache, execution IDs must be unique in the system";
+ + " already exists in the synchronous event cache, "
+ + "execution IDs must be unique in the system";
LOGGER.warn(errorMessage);
throw new ApexEventRuntimeException(errorMessage);
}
@@ -243,7 +242,8 @@ public class SynchronousEventCache extends PeeredReference implements Runnable {
eventCacheMap.put(executionId, new SimpleEntry<Long, Object>(System.currentTimeMillis(), event));
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("event has been cached:" + event);
+ String message = "event has been cached:" + event;
+ LOGGER.debug(message);
}
LOGGER.exit("Added: " + executionId);
@@ -257,7 +257,7 @@ public class SynchronousEventCache extends PeeredReference implements Runnable {
* @return The removed event
*/
private Object removeCachedEventIfExists(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap,
- final long executionId) {
+ final long executionId) {
LOGGER.entry("Removing: " + executionId);
final SimpleEntry<Long, Object> removedEventEntry = eventCacheMap.remove(executionId);
@@ -273,8 +273,8 @@ public class SynchronousEventCache extends PeeredReference implements Runnable {
}
/**
- * Time out events on an event cache map. Events that have a timeout longer than the configured
- * timeout are timed out.
+ * Time out events on an event cache map. Events that have a timeout longer than the configured timeout are timed
+ * out.
*
* @param eventCacheMap the event cache to operate on
*/
@@ -293,12 +293,13 @@ public class SynchronousEventCache extends PeeredReference implements Runnable {
}
// Remove timed out events from the map
- for (final long timedoutEventExecutionID : timedOutEventSet) {
+ for (final long timedoutEventExecutionId : timedOutEventSet) {
// Remove the map entry and issue a warning
- final SimpleEntry<Long, Object> timedOutEventEntry = eventCacheMap.remove(timedoutEventExecutionID);
+ final SimpleEntry<Long, Object> timedOutEventEntry = eventCacheMap.remove(timedoutEventExecutionId);
- LOGGER.warn("synchronous event timed out, reply not received in " + synchronousEventTimeout
- + " milliseconds on event " + timedOutEventEntry.getValue());
+ String message = "synchronous event timed out, reply not received in " + synchronousEventTimeout
+ + " milliseconds on event " + timedOutEventEntry.getValue();
+ LOGGER.warn(message);
}
}
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java
index 8f54c049b..5c44f2d7d 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java
@@ -37,11 +37,6 @@ public class EventConsumerFactory {
private static final XLogger LOGGER = XLoggerFactory.getXLogger(EventConsumerFactory.class);
/**
- * Empty constructor with no generic overloading.
- */
- public EventConsumerFactory() {}
-
- /**
* Create an event consumer of the required type for the specified consumer technology.
*
* @param name the name of the consumer
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java
index 9bbbad362..727f77995 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java
@@ -37,11 +37,6 @@ public class EventProducerFactory {
private static final XLogger LOGGER = XLoggerFactory.getXLogger(EventProducerFactory.class);
/**
- * Empty constructor with no generic overloading.
- */
- public EventProducerFactory() {}
-
- /**
* Create an event producer of the required type for the specified producer technology.
*
* @param name the name of the producer
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java
index 34690cc7d..5fce2c89f 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java
@@ -23,19 +23,18 @@ package org.onap.policy.apex.service.engine.event.impl.enevent;
import java.util.ArrayList;
import java.util.List;
-import org.onap.policy.apex.service.engine.event.ApexEvent;
-import org.onap.policy.apex.service.engine.event.ApexEventConverter;
-import org.onap.policy.apex.service.engine.event.ApexEventException;
-import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
-import org.slf4j.ext.XLogger;
-import org.slf4j.ext.XLoggerFactory;
-
import org.onap.policy.apex.core.engine.engine.ApexEngine;
import org.onap.policy.apex.core.engine.event.EnEvent;
import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
import org.onap.policy.apex.model.basicmodel.service.ModelService;
import org.onap.policy.apex.model.eventmodel.concepts.AxEvent;
import org.onap.policy.apex.model.eventmodel.concepts.AxEvents;
+import org.onap.policy.apex.service.engine.event.ApexEvent;
+import org.onap.policy.apex.service.engine.event.ApexEventConverter;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
/**
* The Class ApexEvent2EnEventConverter converts externally facing {@link ApexEvent} instances to
@@ -90,7 +89,7 @@ public final class ApexEvent2EnEventConverter implements ApexEventConverter {
axEvent.getNameSpace(), axEvent.getSource(), axEvent.getTarget());
// Copy the ExecutionID from the EnEvent into the ApexEvent
- apexEvent.setExecutionID(enEvent.getExecutionId());
+ apexEvent.setExecutionId(enEvent.getExecutionId());
// Copy he exception message to the Apex event if it is set
if (enEvent.getExceptionMessage() != null) {
@@ -101,7 +100,7 @@ public final class ApexEvent2EnEventConverter implements ApexEventConverter {
apexEvent.putAll(enEvent);
// Return the event in a single element
- final ArrayList<ApexEvent> eventList = new ArrayList<ApexEvent>();
+ final ArrayList<ApexEvent> eventList = new ArrayList<>();
eventList.add(apexEvent);
return eventList;
}
@@ -136,7 +135,7 @@ public final class ApexEvent2EnEventConverter implements ApexEventConverter {
enEvent.putAll(apexEvent);
// copy the ExecutionID from the ApexEvent into the EnEvent
- enEvent.setExecutionId(apexEvent.getExecutionID());
+ enEvent.setExecutionId(apexEvent.getExecutionId());
return enEvent;
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FILECarrierTechnologyParameters.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FileCarrierTechnologyParameters.java
index 84d19fc62..cbfe18016 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FILECarrierTechnologyParameters.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FileCarrierTechnologyParameters.java
@@ -31,22 +31,17 @@ import org.onap.policy.common.utils.resources.ResourceUtils;
* This class holds the parameters that allows transport of events into and out of Apex using files and standard input
* and output.
*
- * <p>
- * The following parameters are defined:
- * <ol>
- * <li>fileName: The full path to the file from which to read events or to which to write events.
- * <li>standardIO: If this flag is set to true, then standard input is used to read events in or standard output is used
- * to write events and the fileName parameter is ignored if present
- * <li>standardError: If this flag is set to true, then standard error is used to write events
- * <li>streamingMode: If this flag is set to true, then streaming mode is set for reading events and event handling will
- * wait on the input stream for events until the stream is closed. If streaming model is off, then event reading
- * completes when the end of input is detected.
- * <li>startDelay: The amount of milliseconds to wait at startup startup before processing the first event.
- * </ol>
+ * <p>The following parameters are defined: <ol> <li>fileName: The full path to the file from which to read events or to
+ * which to write events. <li>standardIO: If this flag is set to true, then standard input is used to read events in or
+ * standard output is used to write events and the fileName parameter is ignored if present <li>standardError: If this
+ * flag is set to true, then standard error is used to write events <li>streamingMode: If this flag is set to true, then
+ * streaming mode is set for reading events and event handling will wait on the input stream for events until the stream
+ * is closed. If streaming model is off, then event reading completes when the end of input is detected. <li>startDelay:
+ * The amount of milliseconds to wait at startup startup before processing the first event. </ol>
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
-public class FILECarrierTechnologyParameters extends CarrierTechnologyParameters {
+public class FileCarrierTechnologyParameters extends CarrierTechnologyParameters {
// @formatter:off
/** The label of this carrier technology. */
public static final String FILE_CARRIER_TECHNOLOGY_LABEL = "FILE";
@@ -58,7 +53,7 @@ public class FILECarrierTechnologyParameters extends CarrierTechnologyParameters
public static final String FILE_EVENT_CONSUMER_PLUGIN_CLASS = ApexFileEventConsumer.class.getCanonicalName();
private String fileName;
- private boolean standardIO = false;
+ private boolean standardIo = false;
private boolean standardError = false;
private boolean streamingMode = false;
private long startDelay = 0;
@@ -68,7 +63,7 @@ public class FILECarrierTechnologyParameters extends CarrierTechnologyParameters
* Constructor to create a file carrier technology parameters instance and register the instance with the parameter
* service.
*/
- public FILECarrierTechnologyParameters() {
+ public FileCarrierTechnologyParameters() {
super();
// Set the carrier technology properties for the FILE carrier technology
@@ -91,8 +86,8 @@ public class FILECarrierTechnologyParameters extends CarrierTechnologyParameters
*
* @return true, if standard IO should be used for input or output
*/
- public boolean isStandardIO() {
- return standardIO;
+ public boolean isStandardIo() {
+ return standardIo;
}
/**
@@ -125,10 +120,10 @@ public class FILECarrierTechnologyParameters extends CarrierTechnologyParameters
/**
* Sets if standard IO should be used for event input or output.
*
- * @param standardIO if standard IO should be used for event input or output
+ * @param standardIo if standard IO should be used for event input or output
*/
- public void setStandardIO(final boolean standardIO) {
- this.standardIO = standardIO;
+ public void setStandardIo(final boolean standardIo) {
+ this.standardIo = standardIo;
}
/**
@@ -174,7 +169,7 @@ public class FILECarrierTechnologyParameters extends CarrierTechnologyParameters
*/
@Override
public String toString() {
- return "FILECarrierTechnologyParameters [fileName=" + fileName + ", standardIO=" + standardIO
+ return "FILECarrierTechnologyParameters [fileName=" + fileName + ", standardIO=" + standardIo
+ ", standardError=" + standardError + ", streamingMode=" + streamingMode + ", startDelay="
+ startDelay + "]";
}
@@ -198,12 +193,12 @@ public class FILECarrierTechnologyParameters extends CarrierTechnologyParameters
public GroupValidationResult validate() {
final GroupValidationResult result = super.validate();
- if (!standardIO && !standardError && (fileName == null || fileName.trim().length() == 0)) {
- result.setResult("fileName", ValidationStatus.INVALID,
- "fileName not specified or is blank or null, it must be specified as a valid file location");
+ if (!standardIo && !standardError && (fileName == null || fileName.trim().length() == 0)) {
+ result.setResult("fileName", ValidationStatus.INVALID, "fileName not specified or is blank or null, "
+ + "it must be specified as a valid file location");
}
- if (standardIO || standardError) {
+ if (standardIo || standardError) {
streamingMode = true;
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java
index 7521c3a08..0f0996fb8 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java
@@ -33,23 +33,25 @@ import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
import org.onap.policy.apex.service.engine.event.PeeredReference;
-import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FILECarrierTechnologyParameters;
+import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FileCarrierTechnologyParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Concrete implementation an Apex event consumer that reads events from a file. This consumer also
- * implements ApexEventProducer and therefore can be used as a synchronous consumer.
+ * Concrete implementation an Apex event consumer that reads events from a file. This consumer also implements
+ * ApexEventProducer and therefore can be used as a synchronous consumer.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
-
// Get a reference to the logger
private static final Logger LOGGER = LoggerFactory.getLogger(ApexFileEventConsumer.class);
+ // Recurring string constants
+ private static final String APEX_FILE_CONSUMER_PREAMBLE = "ApexFileConsumer \"";
+
// The input stream to read events from
private InputStream eventInputStream;
@@ -66,35 +68,34 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
private String consumerName = null;
// The specific carrier technology parameters for this consumer
- private FILECarrierTechnologyParameters fileCarrierTechnologyParameters;
+ private FileCarrierTechnologyParameters fileCarrierTechnologyParameters;
// The peer references for this event handler
- private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
- new EnumMap<>(EventHandlerPeeredMode.class);
+ private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(
+ EventHandlerPeeredMode.class);
// Holds the next identifier for event execution.
private static AtomicLong nextExecutionID = new AtomicLong(0L);
/**
- * Private utility to get the next candidate value for a Execution ID. This value will always be
- * unique in a single JVM
+ * Private utility to get the next candidate value for a Execution ID. This value will always be unique in a single
+ * JVM
*
* @return the next candidate value for a Execution ID
*/
- private static synchronized long getNextExecutionID() {
+ private static synchronized long getNextExecutionId() {
return nextExecutionID.getAndIncrement();
}
/*
* (non-Javadoc)
*
- * @see
- * org.onap.policy.apex.apps.uservice.consumer.ApexEventConsumer#init(org.onap.policy.apex.apps.
+ * @see org.onap.policy.apex.apps.uservice.consumer.ApexEventConsumer#init(org.onap.policy.apex.apps.
* uservice.consumer.ApexEventReceiver)
*/
@Override
public void init(final String name, final EventHandlerParameters consumerParameters,
- final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
+ final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
this.eventReceiver = incomingEventReceiver;
this.consumerName = name;
@@ -106,18 +107,18 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
}
// Check and get the file Properties
- if (!(consumerParameters.getCarrierTechnologyParameters() instanceof FILECarrierTechnologyParameters)) {
+ if (!(consumerParameters.getCarrierTechnologyParameters() instanceof FileCarrierTechnologyParameters)) {
final String errorMessage = "specified consumer properties for ApexFileConsumer \"" + consumerName
- + "\" are not applicable to a File consumer";
+ + "\" are not applicable to a File consumer";
LOGGER.warn(errorMessage);
throw new ApexEventException(errorMessage);
}
- fileCarrierTechnologyParameters =
- (FILECarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
+ fileCarrierTechnologyParameters = (FileCarrierTechnologyParameters) consumerParameters
+ .getCarrierTechnologyParameters();
// Open the file producing events
try {
- if (fileCarrierTechnologyParameters.isStandardIO()) {
+ if (fileCarrierTechnologyParameters.isStandardIo()) {
eventInputStream = System.in;
} else {
eventInputStream = new FileInputStream(fileCarrierTechnologyParameters.getFileName());
@@ -125,10 +126,11 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
// Get an event composer for our event source
textBlockReader = new TextBlockReaderFactory().getTaggedReader(eventInputStream,
- consumerParameters.getEventProtocolParameters());
+ consumerParameters.getEventProtocolParameters());
} catch (final IOException e) {
- final String errorMessage = "ApexFileConsumer \"" + consumerName + "\" failed to open file for reading: \""
- + fileCarrierTechnologyParameters.getFileName() + "\"";
+ final String errorMessage = APEX_FILE_CONSUMER_PREAMBLE + consumerName
+ + "\" failed to open file for reading: \"" + fileCarrierTechnologyParameters.getFileName()
+ + "\"";
LOGGER.warn(errorMessage, e);
throw new ApexEventException(errorMessage, e);
}
@@ -195,7 +197,7 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
// Check that we have been initialized in async or sync mode
if (eventReceiver == null) {
LOGGER.warn("\"{}\" has not been initilaized for either asynchronous or synchronous event handling",
- consumerName);
+ consumerName);
return;
}
@@ -209,18 +211,19 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
// Process the event from the text block if there is one there
if (textBlock.getText() != null) {
- eventReceiver.receiveEvent(getNextExecutionID(), textBlock.getText());
+ eventReceiver.receiveEvent(getNextExecutionId(), textBlock.getText());
}
- } while (!textBlock.isEndOfText());
+ }
+ while (!textBlock.isEndOfText());
} catch (final Exception e) {
LOGGER.warn("\"" + consumerName + "\" failed to read event from file: \""
- + fileCarrierTechnologyParameters.getFileName() + "\"", e);
+ + fileCarrierTechnologyParameters.getFileName() + "\"", e);
} finally {
try {
eventInputStream.close();
} catch (final IOException e) {
- LOGGER.warn("ApexFileConsumer \"" + consumerName + "\" failed to close file: \""
- + fileCarrierTechnologyParameters.getFileName() + "\"", e);
+ LOGGER.warn(APEX_FILE_CONSUMER_PREAMBLE + consumerName + "\" failed to close file: \""
+ + fileCarrierTechnologyParameters.getFileName() + "\"", e);
}
}
@@ -236,8 +239,8 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
try {
eventInputStream.close();
} catch (final IOException e) {
- LOGGER.warn("ApexFileConsumer \"" + consumerName + "\" failed to close file for reading: \""
- + fileCarrierTechnologyParameters.getFileName() + "\"", e);
+ LOGGER.warn(APEX_FILE_CONSUMER_PREAMBLE + consumerName + "\" failed to close file for reading: \""
+ + fileCarrierTechnologyParameters.getFileName() + "\"", e);
}
if (consumerThread.isAlive() && !consumerThread.isInterrupted()) {
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java
index b286f8afe..bd7310d0a 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java
@@ -93,12 +93,29 @@ public class CharacterDelimitedTextBlockReader implements TextBlockReader {
return new TextBlock(eofOnInputStream, null);
}
- // The initial nesting level of incoming text blocks is always zero
- int nestingLevel = 0;
+ // Read the block of text
+ final StringBuilder textBlockBuilder = readTextBlockText();
+ // Condition the text block and return it
+ final String textBlock = textBlockBuilder.toString().trim();
+ if (textBlock.length() > 0) {
+ return new TextBlock(eofOnInputStream, textBlock);
+ } else {
+ return new TextBlock(eofOnInputStream, null);
+ }
+ }
+
+ /**
+ * Read a block of text.
+ * @return A string builder containing the text
+ * @throws IOException on read errors
+ */
+ private StringBuilder readTextBlockText() throws IOException {
// Holder for the text block
final StringBuilder textBlockBuilder = new StringBuilder();
+ int nestingLevel = 0;
+
// Read the next text block
while (true) {
final char nextChar = (char) inputStream.read();
@@ -106,13 +123,13 @@ public class CharacterDelimitedTextBlockReader implements TextBlockReader {
// Check for EOF
if (nextChar == (char) -1) {
eofOnInputStream = true;
- break;
+ return textBlockBuilder;
}
if (nextChar == startTagChar) {
nestingLevel++;
} else if (nestingLevel == 0 && !Character.isWhitespace(nextChar)) {
- LOGGER.warn("invalid input on consumer: " + nextChar);
+ LOGGER.warn("invalid input on consumer: {}", nextChar);
continue;
}
@@ -125,17 +142,9 @@ public class CharacterDelimitedTextBlockReader implements TextBlockReader {
}
if (nestingLevel == 0) {
- break;
+ return textBlockBuilder;
}
}
}
-
- // Condition the text block and return it
- final String textBlock = textBlockBuilder.toString().trim();
- if (textBlock.length() > 0) {
- return new TextBlock(eofOnInputStream, textBlock);
- } else {
- return new TextBlock(eofOnInputStream, null);
- }
}
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java
index 07185c024..982044022 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java
@@ -162,7 +162,7 @@ public class HeaderDelimitedTextBlockReader implements TextBlockReader, Runnable
// Condition the text block and return it
final String textBlock = textBlockBuilder.toString().trim();
- final boolean endOfText = (eofOnInputStream && textLineQueue.isEmpty() ? true : false);
+ final boolean endOfText = eofOnInputStream && textLineQueue.isEmpty();
if (textBlock.length() > 0) {
return new TextBlock(endOfText, textBlock);
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java
index d5f9ff1b2..e12b772df 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java
@@ -32,7 +32,7 @@ import org.onap.policy.apex.service.engine.event.ApexEventProducer;
import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
import org.onap.policy.apex.service.engine.event.PeeredReference;
import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
-import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FILECarrierTechnologyParameters;
+import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FileCarrierTechnologyParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
import org.slf4j.Logger;
@@ -74,20 +74,20 @@ public class ApexFileEventProducer implements ApexEventProducer {
}
// Check and get the file Properties
- if (!(producerParameters.getCarrierTechnologyParameters() instanceof FILECarrierTechnologyParameters)) {
+ if (!(producerParameters.getCarrierTechnologyParameters() instanceof FileCarrierTechnologyParameters)) {
final String errorMessage = "specified producer properties for ApexFileProducer \"" + producerName
+ "\" are not applicable to a FILE producer";
LOGGER.warn(errorMessage);
throw new ApexEventException(errorMessage);
}
- final FILECarrierTechnologyParameters fileCarrierTechnologyParameters =
- (FILECarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
+ final FileCarrierTechnologyParameters fileCarrierTechnologyParameters =
+ (FileCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
// Now we create a writer for events
try {
if (fileCarrierTechnologyParameters.isStandardError()) {
eventOutputStream = System.err;
- } else if (fileCarrierTechnologyParameters.isStandardIO()) {
+ } else if (fileCarrierTechnologyParameters.isStandardIo()) {
eventOutputStream = System.out;
} else {
eventOutputStream =
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JSONEventConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JsonEventConverter.java
index 30e9db722..ce511be7f 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JSONEventConverter.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JsonEventConverter.java
@@ -20,6 +20,11 @@
package org.onap.policy.apex.service.engine.event.impl.jsonprotocolplugin;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
import java.util.ArrayList;
import java.util.List;
@@ -37,48 +42,40 @@ import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParame
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
/**
- * The Class Apex2JSONEventConverter converts {@link ApexEvent} instances to and from JSON string
- * representations of Apex events.
+ * The Class Apex2JSONEventConverter converts {@link ApexEvent} instances to and from JSON string representations of
+ * Apex events.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
-public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
- private static final XLogger LOGGER = XLoggerFactory.getXLogger(Apex2JSONEventConverter.class);
+public class Apex2JsonEventConverter implements ApexEventProtocolConverter {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(Apex2JsonEventConverter.class);
// The parameters for the JSON event protocol
- private JSONEventProtocolParameters jsonPars;
+ private JsonEventProtocolParameters jsonPars;
/*
* (non-Javadoc)
*
- * @see
- * org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter#init(org.onap.policy.
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter#init(org.onap.policy.
* apex.service.parameters.eventprotocol.EventProtocolParameters)
*/
@Override
public void init(final EventProtocolParameters parameters) {
// Check and get the JSON parameters
- if (!(parameters instanceof JSONEventProtocolParameters)) {
+ if (!(parameters instanceof JsonEventProtocolParameters)) {
final String errorMessage = "specified consumer properties are not applicable to the JSON event protocol";
LOGGER.warn(errorMessage);
throw new ApexEventRuntimeException(errorMessage);
}
- jsonPars = (JSONEventProtocolParameters) parameters;
+ jsonPars = (JsonEventProtocolParameters) parameters;
}
/*
* (non-Javadoc)
*
- * @see
- * org.onap.policy.apex.service.engine.event.ApexEventConverter#toApexEvent(java.lang.String,
- * java.lang.Object)
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConverter#toApexEvent(java.lang.String, java.lang.Object)
*/
@Override
public List<ApexEvent> toApexEvent(final String eventName, final Object eventObject) throws ApexEventException {
@@ -104,8 +101,8 @@ public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
try {
// We may have a single JSON object with a single event or an array of JSON objects
- final Object decodedJsonObject =
- new GsonBuilder().serializeNulls().create().fromJson(jsonEventString, Object.class);
+ final Object decodedJsonObject = new GsonBuilder().serializeNulls().create().fromJson(jsonEventString,
+ Object.class);
// Check if we have a list of objects
if (decodedJsonObject instanceof List) {
@@ -121,15 +118,15 @@ public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
eventList.add(jsonObject2ApexEvent(eventName, (JsonObject) jsonListObject));
} else {
throw new ApexEventException("incoming event (" + jsonEventString
- + ") is a JSON object array containing an invalid object " + jsonListObject);
+ + ") is a JSON object array containing an invalid object " + jsonListObject);
}
}
} else {
eventList.add(jsonStringApexEvent(eventName, jsonEventString));
}
} catch (final Exception e) {
- final String errorString =
- "Failed to unmarshal JSON event: " + e.getMessage() + ", event=" + jsonEventString;
+ final String errorString = "Failed to unmarshal JSON event: " + e.getMessage() + ", event="
+ + jsonEventString;
LOGGER.warn(errorString, e);
throw new ApexEventException(errorString, e);
}
@@ -141,8 +138,7 @@ public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
/*
* (non-Javadoc)
*
- * @see
- * org.onap.policy.apex.service.engine.event.ApexEventConverter#fromApexEvent(org.onap.policy.
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConverter#fromApexEvent(org.onap.policy.
* apex.service.engine.event.ApexEvent)
*/
@Override
@@ -154,8 +150,8 @@ public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
}
// Get the event definition for the event from the model service
- final AxEvent eventDefinition =
- ModelService.getModel(AxEvents.class).get(apexEvent.getName(), apexEvent.getVersion());
+ final AxEvent eventDefinition = ModelService.getModel(AxEvents.class).get(apexEvent.getName(),
+ apexEvent.getVersion());
// Use a GSON Json object to marshal the Apex event to JSON
final Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();
@@ -177,7 +173,7 @@ public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
if (!apexEvent.containsKey(fieldName)) {
if (!eventField.getOptional()) {
final String errorMessage = "error parsing " + eventDefinition.getId() + " event to Json. "
- + "Field \"" + fieldName + "\" is missing, but is mandatory. Fields: " + apexEvent;
+ + "Field \"" + fieldName + "\" is missing, but is mandatory. Fields: " + apexEvent;
LOGGER.debug(errorMessage);
throw new ApexEventRuntimeException(errorMessage);
}
@@ -187,8 +183,8 @@ public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
final Object fieldValue = apexEvent.get(fieldName);
// Get the schema helper
- final SchemaHelper fieldSchemaHelper =
- new SchemaHelperFactory().createSchemaHelper(eventField.getKey(), eventField.getSchema());
+ final SchemaHelper fieldSchemaHelper = new SchemaHelperFactory().createSchemaHelper(eventField.getKey(),
+ eventField.getSchema());
jsonObject.add(fieldName, (JsonElement) fieldSchemaHelper.marshal2Object(fieldValue));
}
@@ -205,14 +201,14 @@ public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
* @throws ApexEventException thrown on unmarshaling exceptions
*/
private ApexEvent jsonStringApexEvent(final String eventName, final String jsonEventString)
- throws ApexEventException {
+ throws ApexEventException {
// Use GSON to read the event string
- final JsonObject jsonObject =
- new GsonBuilder().serializeNulls().create().fromJson(jsonEventString, JsonObject.class);
+ final JsonObject jsonObject = new GsonBuilder().serializeNulls().create().fromJson(jsonEventString,
+ JsonObject.class);
if (jsonObject == null || !jsonObject.isJsonObject()) {
throw new ApexEventException(
- "incoming event (" + jsonEventString + ") is not a JSON object or an JSON object array");
+ "incoming event (" + jsonEventString + ") is not a JSON object or an JSON object array");
}
return jsonObject2ApexEvent(eventName, jsonObject);
@@ -227,33 +223,33 @@ public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
* @throws ApexEventException thrown on unmarshaling exceptions
*/
private ApexEvent jsonObject2ApexEvent(final String eventName, final JsonObject jsonObject)
- throws ApexEventException {
+ throws ApexEventException {
// Process the mandatory Apex header
final ApexEvent apexEvent = processApexEventHeader(eventName, jsonObject);
// Get the event definition for the event from the model service
- final AxEvent eventDefinition =
- ModelService.getModel(AxEvents.class).get(apexEvent.getName(), apexEvent.getVersion());
+ final AxEvent eventDefinition = ModelService.getModel(AxEvents.class).get(apexEvent.getName(),
+ apexEvent.getVersion());
// Iterate over the input fields in the event
for (final AxField eventField : eventDefinition.getFields()) {
final String fieldName = eventField.getKey().getLocalName();
- if (!hasJSONField(jsonObject, fieldName)) {
+ if (!hasJsonField(jsonObject, fieldName)) {
if (!eventField.getOptional()) {
final String errorMessage = "error parsing " + eventDefinition.getId() + " event from Json. "
- + "Field \"" + fieldName + "\" is missing, but is mandatory.";
+ + "Field \"" + fieldName + "\" is missing, but is mandatory.";
LOGGER.debug(errorMessage);
throw new ApexEventException(errorMessage);
}
continue;
}
- final JsonElement fieldValue = getJSONField(jsonObject, fieldName, null, !eventField.getOptional());
+ final JsonElement fieldValue = getJsonField(jsonObject, fieldName, null, !eventField.getOptional());
if (fieldValue != null && !fieldValue.isJsonNull()) {
// Get the schema helper
- final SchemaHelper fieldSchemaHelper =
- new SchemaHelperFactory().createSchemaHelper(eventField.getKey(), eventField.getSchema());
+ final SchemaHelper fieldSchemaHelper = new SchemaHelperFactory().createSchemaHelper(eventField.getKey(),
+ eventField.getSchema());
apexEvent.put(fieldName, fieldSchemaHelper.createNewInstance(fieldValue));
} else {
apexEvent.put(fieldName, null);
@@ -273,46 +269,32 @@ public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
* @throws ApexEventException on invalid events with missing header fields
*/
private ApexEvent processApexEventHeader(final String eventName, final JsonObject jsonObject)
- throws ApexEventException {
- // Get the event header fields
- // @formatter:off
- String name = getJSONStringField(jsonObject, ApexEvent.NAME_HEADER_FIELD,
- jsonPars.getNameAlias(), ApexEvent.NAME_REGEXP, false);
- String version = getJSONStringField(jsonObject, ApexEvent.VERSION_HEADER_FIELD,
- jsonPars.getVersionAlias(), ApexEvent.VERSION_REGEXP, false);
- String namespace = getJSONStringField(jsonObject, ApexEvent.NAMESPACE_HEADER_FIELD,
- jsonPars.getNameSpaceAlias(), ApexEvent.NAMESPACE_REGEXP, false);
- String source = getJSONStringField(jsonObject, ApexEvent.SOURCE_HEADER_FIELD,
- jsonPars.getSourceAlias(), ApexEvent.SOURCE_REGEXP, false);
- String target = getJSONStringField(jsonObject, ApexEvent.TARGET_HEADER_FIELD,
- jsonPars.getTargetAlias(), ApexEvent.TARGET_REGEXP, false);
- // @formatter:on
+ throws ApexEventException {
+ String name = getJsonStringField(jsonObject, ApexEvent.NAME_HEADER_FIELD, jsonPars.getNameAlias(),
+ ApexEvent.NAME_REGEXP, false);
// Check that an event name has been specified
if (name == null && eventName == null) {
throw new ApexEventRuntimeException(
- "event received without mandatory parameter \"name\" on configuration or on event");
+ "event received without mandatory parameter \"name\" on configuration or on event");
}
// Check if an event name was specified on the event parameters
if (eventName != null) {
if (name != null && !eventName.equals(name)) {
LOGGER.warn("The incoming event name \"{}\" does not match the configured event name \"{}\","
- + " using configured event name", name, eventName);
+ + " using configured event name", name, eventName);
}
name = eventName;
}
// Now, find the event definition in the model service. If version is null, the newest event
// definition in the model service is used
+ String version = getJsonStringField(jsonObject, ApexEvent.VERSION_HEADER_FIELD, jsonPars.getVersionAlias(),
+ ApexEvent.VERSION_REGEXP, false);
final AxEvent eventDefinition = ModelService.getModel(AxEvents.class).get(name, version);
if (eventDefinition == null) {
- if (version == null) {
- throw new ApexEventRuntimeException(
- "an event definition for an event named \"" + name + "\" not found in Apex model");
- }
- throw new ApexEventRuntimeException("an event definition for an event named \"" + name
- + "\" with version \"" + version + "\" not found in Apex model");
+ throwVersionException(name, version);
}
// Use the defined event version if no version is specified on the incoming fields
@@ -321,22 +303,28 @@ public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
}
// Check the name space is OK if it is defined, if not, use the name space from the model
+ String namespace = getJsonStringField(jsonObject, ApexEvent.NAMESPACE_HEADER_FIELD,
+ jsonPars.getNameSpaceAlias(), ApexEvent.NAMESPACE_REGEXP, false);
if (namespace != null) {
if (!namespace.equals(eventDefinition.getNameSpace())) {
- throw new ApexEventRuntimeException(
- "namespace \"" + namespace + "\" on event \"" + name + "\" does not match namespace \""
- + eventDefinition.getNameSpace() + "\" for that event in the Apex model");
+ throw new ApexEventRuntimeException("namespace \"" + namespace + "\" on event \"" + name
+ + "\" does not match namespace \"" + eventDefinition.getNameSpace()
+ + "\" for that event in the Apex model");
}
} else {
namespace = eventDefinition.getNameSpace();
}
// For source, use the defined source only if the source is not found on the incoming event
+ String source = getJsonStringField(jsonObject, ApexEvent.SOURCE_HEADER_FIELD, jsonPars.getSourceAlias(),
+ ApexEvent.SOURCE_REGEXP, false);
if (source == null) {
source = eventDefinition.getSource();
}
// For target, use the defined source only if the source is not found on the incoming event
+ String target = getJsonStringField(jsonObject, ApexEvent.TARGET_HEADER_FIELD, jsonPars.getTargetAlias(),
+ ApexEvent.TARGET_REGEXP, false);
if (target == null) {
target = eventDefinition.getTarget();
}
@@ -345,21 +333,36 @@ public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
}
/**
+ * Throw an exception on event name and/or version with the correct text.
+ * @param name The event name
+ * @param version The event version
+ */
+ private void throwVersionException(String name, String version) {
+ if (version == null) {
+ throw new ApexEventRuntimeException(
+ "an event definition for an event named \"" + name + "\" not found in Apex model");
+ }
+ else {
+ throw new ApexEventRuntimeException("an event definition for an event named \"" + name
+ + "\" with version \"" + version + "\" not found in Apex model");
+ }
+ }
+
+ /**
* This method gets an event string field from a JSON object.
*
* @param jsonObject the JSON object containing the JSON representation of the incoming event
* @param fieldName the field name to find in the event
- * @param fieldAlias the alias for the field to find in the event, overrides the field name if
- * it is not null
- * @param fieldRE the regular expression to check the field against for validity
+ * @param fieldAlias the alias for the field to find in the event, overrides the field name if it is not null
+ * @param fieldRegexp the regular expression to check the field against for validity
* @param mandatory true if the field is mandatory
* @return the value of the field in the JSON object or null if the field is optional
* @throws ApexEventRuntimeException the apex event runtime exception
*/
- private String getJSONStringField(final JsonObject jsonObject, final String fieldName, final String fieldAlias,
- final String fieldRE, final boolean mandatory) {
+ private String getJsonStringField(final JsonObject jsonObject, final String fieldName, final String fieldAlias,
+ final String fieldRegexp, final boolean mandatory) {
// Get the JSON field for the string field
- final JsonElement jsonField = getJSONField(jsonObject, fieldName, fieldAlias, mandatory);
+ final JsonElement jsonField = getJsonField(jsonObject, fieldName, fieldAlias, mandatory);
// Null strings are allowed
if (jsonField == null || jsonField.isJsonNull()) {
@@ -373,18 +376,18 @@ public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
} catch (final Exception e) {
// The element is not a string so throw an error
throw new ApexEventRuntimeException("field \"" + fieldName + "\" with type \""
- + jsonField.getClass().getCanonicalName() + "\" is not a string value");
+ + jsonField.getClass().getCanonicalName() + "\" is not a string value");
}
// Is regular expression checking required
- if (fieldRE == null) {
+ if (fieldRegexp == null) {
return fieldValueString;
}
// Check the event field against its regular expression
- if (!fieldValueString.matches(fieldRE)) {
+ if (!fieldValueString.matches(fieldRegexp)) {
throw new ApexEventRuntimeException(
- "field \"" + fieldName + "\" with value \"" + fieldValueString + "\" is invalid");
+ "field \"" + fieldName + "\" with value \"" + fieldValueString + "\" is invalid");
}
return fieldValueString;
@@ -395,14 +398,13 @@ public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
*
* @param jsonObject the JSON object containing the JSON representation of the incoming event
* @param fieldName the field name to find in the event
- * @param fieldAlias the alias for the field to find in the event, overrides the field name if
- * it is not null
+ * @param fieldAlias the alias for the field to find in the event, overrides the field name if it is not null
* @param mandatory true if the field is mandatory
* @return the value of the field in the JSON object or null if the field is optional
* @throws ApexEventRuntimeException the apex event runtime exception
*/
- private JsonElement getJSONField(final JsonObject jsonObject, final String fieldName, final String fieldAlias,
- final boolean mandatory) {
+ private JsonElement getJsonField(final JsonObject jsonObject, final String fieldName, final String fieldAlias,
+ final boolean mandatory) {
// Check if we should use the alias for this field
String fieldToFind = fieldName;
@@ -431,7 +433,7 @@ public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
* @return true if the field is present
* @throws ApexEventRuntimeException the apex event runtime exception
*/
- private boolean hasJSONField(final JsonObject jsonObject, final String fieldName) {
+ private boolean hasJsonField(final JsonObject jsonObject, final String fieldName) {
// check for the field
return jsonObject.has(fieldName);
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JSONEventProtocolParameters.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JsonEventProtocolParameters.java
index 6efcceb43..8e44ec59b 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JSONEventProtocolParameters.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JsonEventProtocolParameters.java
@@ -41,7 +41,7 @@ import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextCh
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
-public class JSONEventProtocolParameters extends EventProtocolTextCharDelimitedParameters {
+public class JsonEventProtocolParameters extends EventProtocolTextCharDelimitedParameters {
/** The label of this event protocol. */
public static final String JSON_EVENT_PROTOCOL_LABEL = "JSON";
@@ -62,8 +62,8 @@ public class JSONEventProtocolParameters extends EventProtocolTextCharDelimitedP
* Constructor to create a JSON event protocol parameter instance and register the instance with
* the parameter service.
*/
- public JSONEventProtocolParameters() {
- this(JSONEventProtocolParameters.class.getCanonicalName(), JSON_EVENT_PROTOCOL_LABEL);
+ public JsonEventProtocolParameters() {
+ this(JsonEventProtocolParameters.class.getCanonicalName(), JSON_EVENT_PROTOCOL_LABEL);
}
/**
@@ -73,7 +73,7 @@ public class JSONEventProtocolParameters extends EventProtocolTextCharDelimitedP
* @param parameterClassName the class name of a sub class of this class
* @param eventProtocolLabel the name of the event protocol for this plugin
*/
- public JSONEventProtocolParameters(final String parameterClassName, final String eventProtocolLabel) {
+ public JsonEventProtocolParameters(final String parameterClassName, final String eventProtocolLabel) {
super(parameterClassName);
// Set the event protocol properties for the JSON event protocol
@@ -84,7 +84,7 @@ public class JSONEventProtocolParameters extends EventProtocolTextCharDelimitedP
this.setEndChar(JSON_TEXT_BLOCK_END_DELIMITER);
// Set the event protocol plugin class
- this.setEventProtocolPluginClass(Apex2JSONEventConverter.class.getCanonicalName());
+ this.setEventProtocolPluginClass(Apex2JsonEventConverter.class.getCanonicalName());
}
/* (non-Javadoc)
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexCommandLineArguments.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexCommandLineArguments.java
index d6d278ebf..bdbd82dc2 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexCommandLineArguments.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexCommandLineArguments.java
@@ -42,6 +42,8 @@ import org.onap.policy.common.utils.resources.ResourceUtils;
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
public class ApexCommandLineArguments {
+ // Recurring string constants
+ private static final String FILE_PREAMBLE = " file \"";
private static final int HELP_LINE_LENGTH = 120;
// Apache Commons CLI options
@@ -181,12 +183,11 @@ public class ApexCommandLineArguments {
* @return the help string
*/
public String help(final String mainClassName) {
- final HelpFormatter helpFormatter = new HelpFormatter();
final StringWriter stringWriter = new StringWriter();
- final PrintWriter stringPW = new PrintWriter(stringWriter);
+ final PrintWriter stringPrintWriter = new PrintWriter(stringWriter);
- helpFormatter.printHelp(stringPW, HELP_LINE_LENGTH, mainClassName + " [options...]", "options", options, 0, 0,
- "");
+ new HelpFormatter().printHelp(stringPrintWriter, HELP_LINE_LENGTH, mainClassName + " [options...]", "options",
+ options, 0, 0, "");
return stringWriter.toString();
}
@@ -268,20 +269,20 @@ public class ApexCommandLineArguments {
}
// The file name can refer to a resource on the local file system or on the class path
- final URL fileURL = ResourceUtils.getUrl4Resource(fileName);
- if (fileURL == null) {
- throw new ApexException(fileTag + " file \"" + fileName + "\" does not exist");
+ final URL fileUrl = ResourceUtils.getUrl4Resource(fileName);
+ if (fileUrl == null) {
+ throw new ApexException(fileTag + FILE_PREAMBLE + fileName + "\" does not exist");
}
- final File theFile = new File(fileURL.getPath());
+ final File theFile = new File(fileUrl.getPath());
if (!theFile.exists()) {
- throw new ApexException(fileTag + " file \"" + fileName + "\" does not exist");
+ throw new ApexException(fileTag + FILE_PREAMBLE + fileName + "\" does not exist");
}
if (!theFile.isFile()) {
- throw new ApexException(fileTag + " file \"" + fileName + "\" is not a normal file");
+ throw new ApexException(fileTag + FILE_PREAMBLE + fileName + "\" is not a normal file");
}
if (!theFile.canRead()) {
- throw new ApexException(fileTag + " file \"" + fileName + "\" is ureadable");
+ throw new ApexException(fileTag + FILE_PREAMBLE + fileName + "\" is ureadable");
}
}
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventMarshaller.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventMarshaller.java
index 9904847aa..532fdb9c7 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventMarshaller.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventMarshaller.java
@@ -187,19 +187,19 @@ public class ApexEventMarshaller implements ApexEventListener, Runnable {
// Process the next Apex event from the queue
final Object event = converter.fromApexEvent(apexEvent);
- producer.sendEvent(apexEvent.getExecutionID(), apexEvent.getName(), event);
+ producer.sendEvent(apexEvent.getExecutionId(), apexEvent.getName(), event);
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("event sent : " + apexEvent.toString());
+ String message = "event sent : " + apexEvent.toString();
+ LOGGER.trace(message);
}
} catch (final InterruptedException e) {
// restore the interrupt status
Thread.currentThread().interrupt();
LOGGER.debug("Thread interrupted, Reason {}", e.getMessage());
- break;
+ stopOrderedFlag = true;
} catch (final Exception e) {
LOGGER.warn("Error while forwarding events for " + marshallerThread.getName(), e);
- continue;
}
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java
index 7b4188ea1..1d1b64e37 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java
@@ -44,8 +44,8 @@ import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
/**
- * This event unmarshaler handles events coming into Apex, handles threading, event queuing,
- * transformation and receiving using the configured receiving technology.
+ * This event unmarshaler handles events coming into Apex, handles threading, event queuing, transformation and
+ * receiving using the configured receiving technology.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
@@ -88,7 +88,7 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
* @param consumerParameters the consumer parameters for this specific unmarshaler
*/
public ApexEventUnmarshaller(final String name, final EngineServiceParameters engineServiceParameters,
- final EventHandlerParameters consumerParameters) {
+ final EventHandlerParameters consumerParameters) {
this.name = name;
this.engineServiceParameters = engineServiceParameters;
this.consumerParameters = consumerParameters;
@@ -97,8 +97,7 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
/**
* Configure the consumer and initialize the thread for event sending.
*
- * @param incomingEngineServiceHandler the Apex engine service handler for passing events to
- * Apex
+ * @param incomingEngineServiceHandler the Apex engine service handler for passing events to Apex
* @throws ApexEventException on errors initializing event handling
*/
public void init(final ApexEngineServiceHandler incomingEngineServiceHandler) throws ApexEventException {
@@ -119,8 +118,8 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
consumer.start();
// Configure and start the event reception thread
- final String threadName =
- engineServiceParameters.getEngineKey().getName() + ":" + this.getClass().getName() + ":" + name;
+ final String threadName = engineServiceParameters.getEngineKey().getName() + ":" + this.getClass().getName()
+ + ":" + name;
unmarshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
unmarshallerThread.setDaemon(true);
unmarshallerThread.start();
@@ -165,7 +164,7 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
// To connect a synchronous unmarshaler and marshaler, we create a synchronous event
// cache on the consumer/producer pair
new SynchronousEventCache(peeredMode, consumer, peeredMarshaller.getProducer(),
- consumerParameters.getPeerTimeout(EventHandlerPeeredMode.SYNCHRONOUS));
+ consumerParameters.getPeerTimeout(EventHandlerPeeredMode.SYNCHRONOUS));
return;
case REQUESTOR:
@@ -180,8 +179,7 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
/*
* (non-Javadoc)
*
- * @see
- * org.onap.policy.apex.service.engine.event.ApexEventReceiver#receiveEvent(java.lang.Object)
+ * @see org.onap.policy.apex.service.engine.event.ApexEventReceiver#receiveEvent(java.lang.Object)
*/
@Override
public void receiveEvent(final Object event) throws ApexEventException {
@@ -191,8 +189,7 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.service.engine.event.ApexEventReceiver#receiveEvent(long,
- * java.lang.Object)
+ * @see org.onap.policy.apex.service.engine.event.ApexEventReceiver#receiveEvent(long, java.lang.Object)
*/
@Override
public void receiveEvent(final long executionId, final Object event) throws ApexEventException {
@@ -204,15 +201,15 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
*
* @param executionId the execution id the incoming execution ID
* @param event the event in its native format
- * @param generateExecutionId if true, let Apex generate the execution ID, if false, use the
- * incoming execution ID
+ * @param generateExecutionId if true, let Apex generate the execution ID, if false, use the incoming execution ID
* @throws ApexEventException on unmarshaling errors on events
*/
private void receiveEvent(final long executionId, final Object event, final boolean generateExecutionId)
- throws ApexEventException {
+ throws ApexEventException {
// Push the event onto the queue
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("onMessage(): event received: {}", event.toString());
+ String eventString = "onMessage(): event received: " + event.toString();
+ LOGGER.trace(eventString);
}
// Convert the incoming events to Apex events
@@ -222,10 +219,10 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
// Check if we are filtering events on this unmarshaler, if so check the event name
// against the filter
if (consumerParameters.isSetEventNameFilter()
- && !apexEvent.getName().matches(consumerParameters.getEventNameFilter())) {
+ && !apexEvent.getName().matches(consumerParameters.getEventNameFilter())) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("onMessage(): event {} not processed, filtered out by filter", apexEvent,
- consumerParameters.getEventNameFilter());
+ consumerParameters.getEventNameFilter());
}
// Ignore this event
@@ -233,7 +230,7 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
}
if (!generateExecutionId) {
- apexEvent.setExecutionID(executionId);
+ apexEvent.setExecutionId(executionId);
}
// Enqueue the event
@@ -241,22 +238,22 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
// Cache synchronized events that are sent
if (consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
- final SynchronousEventCache synchronousEventCache =
- (SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS);
- synchronousEventCache.cacheSynchronizedEventToApex(apexEvent.getExecutionID(), apexEvent);
+ final SynchronousEventCache synchronousEventCache = (SynchronousEventCache) consumer
+ .getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS);
+ synchronousEventCache.cacheSynchronizedEventToApex(apexEvent.getExecutionId(), apexEvent);
}
}
} catch (final ApexException e) {
final String errorMessage = "Error while converting event into an ApexEvent for " + name + ": "
- + e.getMessage() + ", Event=" + event;
+ + e.getMessage() + ", Event=" + event;
LOGGER.warn(errorMessage, e);
throw new ApexEventException(errorMessage, e);
}
}
/**
- * Run a thread that runs forever (well until system termination anyway) and listens for
- * incoming events on the queue.
+ * Run a thread that runs forever (well until system termination anyway) and listens for incoming events on the
+ * queue.
*/
@Override
public void run() {
@@ -270,7 +267,8 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
}
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("event received {}", apexEvent.toString());
+ String message = apexEvent.toString();
+ LOGGER.trace("event received {}", message);
}
// Pass the event to the activator for forwarding to Apex
@@ -279,10 +277,9 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
// restore the interrupt status
Thread.currentThread().interrupt();
LOGGER.warn("BatchProcessor thread interrupted, Reason {}", e.getMessage());
- break;
+ stopOrderedFlag = true;
} catch (final Exception e) {
LOGGER.warn("Error while forwarding events for " + unmarshallerThread.getName(), e);
- continue;
}
}
@@ -309,10 +306,9 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
stopOrderedFlag = true;
// Order a stop on the synchronous cache if one exists
- if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
- if (consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
- ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
- }
+ if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
+ && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
+ ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
}
// Wait for thread shutdown
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java
index 436225fc4..2b15b145f 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java
@@ -50,7 +50,6 @@ public class ApexMain {
* @param args the commaind line arguments
*/
public ApexMain(final String[] args) {
- System.out.println("Starting Apex service with parameters " + Arrays.toString(args) + " . . .");
LOGGER.entry("Starting Apex service with parameters " + Arrays.toString(args) + " . . .");
// Check the arguments
@@ -60,16 +59,13 @@ public class ApexMain {
final String argumentMessage = arguments.parse(args);
if (argumentMessage != null) {
LOGGER.info(argumentMessage);
- System.out.println(argumentMessage);
return;
}
// Validate that the arguments are sane
arguments.validate();
} catch (final ApexException e) {
- System.err.println("start of Apex service failed: " + e.getMessage());
LOGGER.error("start of Apex service failed", e);
- System.err.println(arguments.help(ApexMain.class.getCanonicalName()));
return;
}
@@ -77,7 +73,6 @@ public class ApexMain {
try {
parameters = new ApexParameterHandler().getParameters(arguments);
} catch (final Exception e) {
- System.err.println("start of Apex service failed\n" + e.getMessage());
LOGGER.error("start of Apex service failed", e);
return;
}
@@ -103,8 +98,6 @@ public class ApexMain {
try {
activator.initialize();
} catch (final ApexActivatorException e) {
- System.err.println("start of Apex service failed, used parameters are " + Arrays.toString(args));
- e.printStackTrace(System.err);
LOGGER.error("start of Apex service failed, used parameters are " + Arrays.toString(args), e);
return;
}
@@ -112,7 +105,6 @@ public class ApexMain {
// Add a shutdown hook to shut everything down in an orderly manner
Runtime.getRuntime().addShutdownHook(new ApexMainShutdownHookClass());
LOGGER.exit("Started Apex");
- System.out.println("Started Apex service");
}
/**
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/EngineService.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/EngineService.java
index 1af0c9d1c..ef17a8eab 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/EngineService.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/EngineService.java
@@ -101,7 +101,7 @@ public interface EngineService {
void updateModel(AxArtifactKey engineServiceKey, AxPolicyModel apexModel, boolean forceFlag) throws ApexException;
/**
- * This method returns the state of an engine service or engine.
+ * Return the state of an engine service or engine.
*
* @return The engine service or engine state
*/
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java
index 04fb8e389..c99987542 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java
@@ -63,6 +63,10 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceImpl.class);
private static final boolean DEBUG_ENABLED = LOGGER.isDebugEnabled();
+ // Recurring string constants
+ private static final String ENGINE_KEY_PREAMBLE = "engine with key ";
+ private static final String NOT_FOUND_SUFFIX = " not found in engine service";
+
// Constants for timing
private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds
private static final long MAX_STOP_WAIT_TIME = 5000; // 5 seconds
@@ -96,7 +100,7 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
* @throws ApexException on worker instantiation errors
*/
private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int incomingThreadCount,
- final long periodicEventPeriod) throws ApexException {
+ final long periodicEventPeriod) {
LOGGER.entry(engineServiceKey, incomingThreadCount);
this.engineServiceKey = engineServiceKey;
@@ -121,26 +125,6 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
}
/**
- * Create an Apex Engine Service instance. This method is deprecated and will be removed in the next version.
- *
- * @param engineServiceKey the engine service key
- * @param threadCount the thread count, the number of engine workers to start
- * @return the Engine Service instance
- * @throws ApexException on worker instantiation errors
- * @deprecated Do not use this version. Use {@link #create(EngineServiceParameters)}
- */
- @Deprecated
- public static EngineServiceImpl create(final AxArtifactKey engineServiceKey, final int threadCount)
- throws ApexException {
- // Check if the Apex model specified is sane
- if (engineServiceKey == null) {
- LOGGER.warn("engine service key is null");
- throw new ApexException("engine service key is null");
- }
- return new EngineServiceImpl(engineServiceKey, threadCount, 0);
- }
-
- /**
* Create an Apex Engine Service instance. This method does not load the policy so
* {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} or
* {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} must be used to load a model. This method does not
@@ -266,10 +250,10 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
final boolean forceFlag) throws ApexException {
// Check if the Apex model specified is sane
if (apexModelString == null || apexModelString.trim().length() == 0) {
- LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
- + " is empty");
- throw new ApexException("model for updating on engine service with key " + incomingEngineServiceKey.getId()
- + " is empty");
+ String emptyModelMessage = "model for updating engine service with key "
+ + incomingEngineServiceKey.getId() + " is empty";
+ LOGGER.warn(emptyModelMessage);
+ throw new ApexException(emptyModelMessage);
}
// Read the Apex model into memory using the Apex Model Reader
@@ -278,15 +262,15 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
apexPolicyModel = modelReader.read(new ByteArrayInputStream(apexModelString.getBytes()));
} catch (final ApexModelException e) {
- LOGGER.error("failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId(), e);
- throw new ApexException(
- "failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId(),
- e);
+ String message = "failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId();
+ LOGGER.error(message, e);
+ throw new ApexException(message, e);
}
if (apexPolicyModel == null) {
- LOGGER.error("apex model null on engine service " + incomingEngineServiceKey.getId());
- throw new ApexException("apex model null on engine service " + incomingEngineServiceKey.getId());
+ String message = "apex model null on engine service " + incomingEngineServiceKey.getId();
+ LOGGER.error(message);
+ throw new ApexException(message);
}
// Update the model
@@ -327,43 +311,12 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
// The current policy model may or may not be defined
final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
- if (forceFlag) {
- LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
- + "\" is not a compatible model update from the existing engine model with key \""
- + currentModel.getKey().getId() + "\"");
- } else {
- throw new ContextException("apex model update failed, supplied model with key \""
- + apexModel.getKey().getId()
- + "\" is not a compatible model update from the existing engine model with key \""
- + currentModel.getKey().getId() + "\"");
- }
+ handleIncompatibility(apexModel, forceFlag, currentModel);
}
}
if (!isStopped()) {
- // Stop all engines on this engine service
- stop();
- final long stoptime = System.currentTimeMillis();
- while (!isStopped() && System.currentTimeMillis() - stoptime < MAX_STOP_WAIT_TIME) {
- ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
- }
- // Check if all engines are stopped
- final StringBuilder notStoppedEngineIdBuilder = new StringBuilder();
- for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
- if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
- notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
- notStoppedEngineIdBuilder.append('(');
- notStoppedEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
- notStoppedEngineIdBuilder.append(") ");
- }
- }
- if (notStoppedEngineIdBuilder.length() > 0) {
- final String errorString = "cannot update model on engine service with key "
- + incomingEngineServiceKey.getId() + ", engines not stopped after " + MAX_STOP_WAIT_TIME
- + "ms are: " + notStoppedEngineIdBuilder.toString().trim();
- LOGGER.warn(errorString);
- throw new ApexException(errorString);
- }
+ stopEngines(incomingEngineServiceKey);
}
// Update the engines
@@ -400,6 +353,58 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
LOGGER.exit();
}
+ /**
+ * Stop engines for a model update.
+ * @param incomingEngineServiceKey the engine service key for the engines that are to be stopped
+ * @throws ApexException on errors stopping engines
+ */
+ private void stopEngines(final AxArtifactKey incomingEngineServiceKey) throws ApexException {
+ // Stop all engines on this engine service
+ stop();
+ final long stoptime = System.currentTimeMillis();
+ while (!isStopped() && System.currentTimeMillis() - stoptime < MAX_STOP_WAIT_TIME) {
+ ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
+ }
+ // Check if all engines are stopped
+ final StringBuilder notStoppedEngineIdBuilder = new StringBuilder();
+ for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
+ if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
+ notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
+ notStoppedEngineIdBuilder.append('(');
+ notStoppedEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
+ notStoppedEngineIdBuilder.append(") ");
+ }
+ }
+ if (notStoppedEngineIdBuilder.length() > 0) {
+ final String errorString = "cannot update model on engine service with key "
+ + incomingEngineServiceKey.getId() + ", engines not stopped after " + MAX_STOP_WAIT_TIME
+ + "ms are: " + notStoppedEngineIdBuilder.toString().trim();
+ LOGGER.warn(errorString);
+ throw new ApexException(errorString);
+ }
+ }
+
+ /**
+ * Issue compatibility warning or error message.
+ * @param apexModel The model name
+ * @param forceFlag true if we are forcing the update
+ * @param currentModel the existing model that is loaded
+ * @throws ContextException on compatibility errors
+ */
+ private void handleIncompatibility(final AxPolicyModel apexModel, final boolean forceFlag,
+ final AxPolicyModel currentModel) throws ContextException {
+ if (forceFlag) {
+ LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
+ + "\" is not a compatible model update from the existing engine model with key \""
+ + currentModel.getKey().getId() + "\"");
+ } else {
+ throw new ContextException("apex model update failed, supplied model with key \""
+ + apexModel.getKey().getId()
+ + "\" is not a compatible model update from the existing engine model with key \""
+ + currentModel.getKey().getId() + "\"");
+ }
+ }
+
/*
* (non-Javadoc)
*
@@ -446,8 +451,9 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
// Check if we have this key on our map
if (!engineWorkerMap.containsKey(engineKey)) {
- LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
- throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
+ String message = ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX;
+ LOGGER.warn(message);
+ throw new ApexException(message);
}
// Start the engine
@@ -487,8 +493,8 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
// Check if we have this key on our map
if (!engineWorkerMap.containsKey(engineKey)) {
- LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
- throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
+ LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
+ throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
}
// Stop the engine
@@ -528,8 +534,8 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
// Check if we have this key on our map
if (!engineWorkerMap.containsKey(engineKey)) {
- LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
- throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
+ LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
+ throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
}
// Clear the engine
@@ -566,7 +572,7 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
public boolean isStarted(final AxArtifactKey engineKey) {
// Check if we have this key on our map
if (!engineWorkerMap.containsKey(engineKey)) {
- LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
+ LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
}
return engineWorkerMap.get(engineKey).isStarted();
}
@@ -597,7 +603,7 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
public boolean isStopped(final AxArtifactKey engineKey) {
// Check if we have this key on our map
if (!engineWorkerMap.containsKey(engineKey)) {
- LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
+ LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
}
return engineWorkerMap.get(engineKey).isStopped();
}
@@ -611,10 +617,10 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
public void startPeriodicEvents(final long period) throws ApexException {
// Check if periodic events are already started
if (periodicEventGenerator != null) {
- LOGGER.warn("Peiodic event geneation already running on engine " + engineServiceKey.getId() + ", "
- + periodicEventGenerator.toString());
- throw new ApexException("Peiodic event geneation already running on engine " + engineServiceKey.getId()
- + ", " + periodicEventGenerator.toString());
+ String message = "Peiodic event geneation already running on engine " + engineServiceKey.getId() + ", "
+ + periodicEventGenerator.toString();
+ LOGGER.warn(message);
+ throw new ApexException(message);
}
// Set up periodic event execution, its a Java Timer/TimerTask
@@ -653,8 +659,8 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
public String getStatus(final AxArtifactKey engineKey) throws ApexException {
// Check if we have this key on our map
if (!engineWorkerMap.containsKey(engineKey)) {
- LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
- throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
+ LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
+ throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
}
// Return the information for this worker
@@ -671,8 +677,8 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException {
// Check if we have this key on our map
if (!engineWorkerMap.containsKey(engineKey)) {
- LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
- throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
+ LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
+ throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
}
// Return the information for this worker
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java
index b9a405b44..dc5e91979 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java
@@ -75,6 +75,13 @@ final class EngineWorker implements EngineService {
// Logger for this class
private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class);
+ // Recurring string constants
+ private static final String IS_NULL_SUFFIX = " is null";
+ private static final String ENGINE_FOR_KEY_PREFIX = "apex engine for engine key ";
+ private static final String ENGINE_SUFFIX = " of this engine";
+ private static final String BAD_KEY_MATCH_TAG = " does not match the key";
+ private static final String ENGINE_KEY_PREFIX = "engine key ";
+
// The ID of this engine
private final AxArtifactKey engineWorkerKey;
@@ -102,7 +109,7 @@ final class EngineWorker implements EngineService {
* @throws ApexException thrown on errors on worker instantiation
*/
EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
- final ApplicationThreadFactory threadFactory) throws ApexException {
+ final ApplicationThreadFactory threadFactory) {
LOGGER.entry(engineWorkerKey);
this.engineWorkerKey = engineWorkerKey;
@@ -252,10 +259,10 @@ final class EngineWorker implements EngineService {
// Check if the key on the update request is correct
if (!engineWorkerKey.equals(engineKey)) {
- LOGGER.warn("engine key " + engineKey.getId() + " does not match the key" + engineWorkerKey.getId()
- + " of this engine");
- throw new ApexException("engine key " + engineKey.getId() + " does not match the key"
- + engineWorkerKey.getId() + " of this engine");
+ String message = ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
+ + ENGINE_SUFFIX;
+ LOGGER.warn(message);
+ throw new ApexException(message);
}
// Sanity checks on the Apex model
@@ -323,23 +330,24 @@ final class EngineWorker implements EngineService {
// Check if the key on the start request is correct
if (!engineWorkerKey.equals(engineKey)) {
- LOGGER.warn("engine key " + engineKey.getId() + " does not match the key" + engineWorkerKey.getId()
- + " of this engine");
- throw new ApexException("engine key " + engineKey.getId() + " does not match the key"
- + engineWorkerKey.getId() + " of this engine");
+ LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
+ + ENGINE_SUFFIX);
+ throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG
+ + engineWorkerKey.getId() + ENGINE_SUFFIX);
}
if (engine == null) {
- LOGGER.error("apex engine for engine key" + engineWorkerKey.getId() + " null");
- throw new ApexException("apex engine for engine key" + engineWorkerKey.getId() + " null");
+ String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is null";
+ LOGGER.error(message);
+ throw new ApexException(message);
}
// Starts the event processing thread that handles incoming events
if (processorThread != null && processorThread.isAlive()) {
- LOGGER.error("apex engine for engine key" + engineWorkerKey.getId() + " is already running with state "
- + getState());
- throw new ApexException("apex engine for engine key" + engineWorkerKey.getId()
- + " is already running with state " + getState());
+ String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state "
+ + getState();
+ LOGGER.error(message);
+ throw new ApexException(message);
}
// Start the engine
@@ -373,22 +381,23 @@ final class EngineWorker implements EngineService {
public void stop(final AxArtifactKey engineKey) throws ApexException {
// Check if the key on the start request is correct
if (!engineWorkerKey.equals(engineKey)) {
- LOGGER.warn("engine key " + engineKey.getId() + " does not match the key" + engineWorkerKey.getId()
- + " of this engine");
- throw new ApexException("engine key " + engineKey.getId() + " does not match the key"
- + engineWorkerKey.getId() + " of this engine");
+ LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
+ + ENGINE_SUFFIX);
+ throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG
+ + engineWorkerKey.getId() + ENGINE_SUFFIX);
}
if (engine == null) {
- LOGGER.error("apex engine for engine key" + engineWorkerKey.getId() + " null");
- throw new ApexException("apex engine for engine key" + engineWorkerKey.getId() + " null");
+ String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is null";
+ LOGGER.error(message);
+ throw new ApexException(message);
}
// Interrupt the worker to stop its thread
if (processorThread == null || !processorThread.isAlive()) {
processorThread = null;
- LOGGER.warn("apex engine for engine key" + engineWorkerKey.getId() + " is already stopped with state "
+ LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state "
+ getState());
return;
}
@@ -424,20 +433,20 @@ final class EngineWorker implements EngineService {
public void clear(final AxArtifactKey engineKey) throws ApexException {
// Check if the key on the start request is correct
if (!engineWorkerKey.equals(engineKey)) {
- LOGGER.warn("engine key " + engineKey.getId() + " does not match the key" + engineWorkerKey.getId()
- + " of this engine");
- throw new ApexException("engine key " + engineKey.getId() + " does not match the key"
- + engineWorkerKey.getId() + " of this engine");
+ LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
+ + ENGINE_SUFFIX);
+ throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG
+ + engineWorkerKey.getId() + ENGINE_SUFFIX);
}
if (engine == null) {
- LOGGER.error("apex engine for engine key" + engineWorkerKey.getId() + " null");
- throw new ApexException("apex engine for engine key" + engineWorkerKey.getId() + " null");
+ LOGGER.error(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + IS_NULL_SUFFIX);
+ throw new ApexException(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + IS_NULL_SUFFIX);
}
// Interrupt the worker to stop its thread
if (processorThread != null && !processorThread.isAlive()) {
- LOGGER.warn("apex engine for engine key" + engineWorkerKey.getId() + " is not stopped with state "
+ LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state "
+ getState());
return;
}
@@ -653,7 +662,7 @@ final class EngineWorker implements EngineService {
final JsonElement jsonElement = jsonParser.parse(runtimeJsonStringBuilder.toString());
final String tidiedRuntimeString = gson.toJson(jsonElement);
- LOGGER.debug("runtime information=" + tidiedRuntimeString);
+ LOGGER.debug("runtime information={}", tidiedRuntimeString);
return tidiedRuntimeString;
}
@@ -691,7 +700,8 @@ final class EngineWorker implements EngineService {
// Take events from the event processing queue of the worker and pass them to the engine
// for processing
- while (!processorThread.isInterrupted()) {
+ boolean stopFlag = false;
+ while (!processorThread.isInterrupted() && ! stopFlag) {
ApexEvent event = null;
try {
event = eventProcessingQueue.take();
@@ -714,7 +724,7 @@ final class EngineWorker implements EngineService {
LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e);
} catch (final Exception e) {
LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e);
- break;
+ stopFlag = true;
}
}
LOGGER.debug("Engine {} completed processing", engineWorkerKey);