diff options
author | James Forsyth <jf2512@att.com> | 2018-07-10 13:26:03 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-07-10 13:26:03 +0000 |
commit | ac1830e63f1e93714f5bc8eef4239652a018070f (patch) | |
tree | ec06f239251e895a3950817177503e1f65d0152d | |
parent | bce8bc8383c3f19b64b360a941ec1b1503791c85 (diff) | |
parent | 76705d87ce3155fdf12f31e1391409cfa1361e61 (diff) |
Merge "change order in class for java convention"
-rw-r--r-- | champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java | 144 |
1 files changed, 69 insertions, 75 deletions
diff --git a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java index 1f93a97..52ad560 100644 --- a/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java +++ b/champ-lib/champ-core/src/main/java/org/onap/aai/champcore/event/AbstractLoggingChampGraph.java @@ -37,7 +37,6 @@ import org.onap.aai.champcore.ChampGraph; import org.onap.aai.champcore.ChampTransaction; import org.onap.aai.champcore.event.ChampEvent.ChampOperation; import org.onap.aai.champcore.event.envelope.ChampEventEnvelope; -import org.onap.aai.champcore.event.envelope.ChampEventHeader; import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException; import org.onap.aai.champcore.exceptions.ChampMarshallingException; import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException; @@ -65,8 +64,76 @@ import org.slf4j.LoggerFactory; */ public abstract class AbstractLoggingChampGraph implements ChampGraph { + public static final String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity"; + public static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000; + public static final String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size"; + public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5; + public static final String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher"; + + /** Pool of worker threads that do the work of publishing the events to the event bus. */ + protected ThreadPoolExecutor publisherPool; + + /** Client used for publishing events to the event bus. */ + protected EventPublisher producer; + + /** Internal queue where outgoing events will be buffered until they can be serviced by + * the event publisher worker threads. */ + protected BlockingQueue<ChampEvent> eventQueue; + + /** Number of events that can be queued up for publication before we begin dropping + * events. */ + private Integer eventQueueCapacity; + + /** Number of event publisher worker threads. */ + private Integer eventStreamPublisherPoolSize; + private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class); + + /** + * Create a new instance of the AbstractLoggingChampGraph. + * + * @param properties - Set of configuration properties for this graph instance. + */ + protected AbstractLoggingChampGraph(Map<String, Object> properties) { + + // Extract the necessary parameters from the configuration properties. + configure(properties); + + // Make sure we were passed an event producer as one of our properties, otherwise + // there is really nothing more we can do... + if(producer == null) { + logger.error("No event stream producer was supplied."); + logger.error("NOTE!! Champ events will NOT be published to the event stream!"); + return; + } + + // Create the blocking queue that we will use to buffer events that we want + // published to the event bus. + eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity); + + // Create the executor pool that will do the work of publishing events to the event bus. + publisherPool = + (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize, + new ProducerWorkerThreadFactory()); + + try { + + // Start up the producer worker threads. + for(int i=0; i<eventStreamPublisherPoolSize; i++) { + publisherPool.submit(new EventPublisherWorker()); + } + + } catch (Exception e) { + + logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'"); + logger.error("NOTE!! Champ events may NOT be published to the event stream!"); + return; + } + } + + + public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException; public abstract Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException; public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException; @@ -78,8 +145,8 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException; public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException; public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException; - public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException; + public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException; /** * Creates or updates a vertex in the graph data store. @@ -259,7 +326,6 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { * @param index - The object index to be created/updated. */ public abstract void executeStoreObjectIndex(ChampObjectIndex index); - public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName); public abstract Stream<ChampObjectIndex> retrieveObjectIndices(); public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException; @@ -275,35 +341,6 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { public abstract ChampCapabilities capabilities(); - - public final static String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity"; - public final static Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000; - - public final static String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size"; - public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5; - - public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher"; - - - - /** Number of events that can be queued up for publication before we begin dropping - * events. */ - private Integer eventQueueCapacity; - - /** Number of event publisher worker threads. */ - private Integer eventStreamPublisherPoolSize; - - /** Pool of worker threads that do the work of publishing the events to the event bus. */ - protected ThreadPoolExecutor publisherPool; - - /** Client used for publishing events to the event bus. */ - protected EventPublisher producer; - - /** Internal queue where outgoing events will be buffered until they can be serviced by - * the event publisher worker threads. */ - protected BlockingQueue<ChampEvent> eventQueue; - - /** * Thread factory for the event producer workers. */ @@ -318,49 +355,6 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph { /** - * Create a new instance of the AbstractLoggingChampGraph. - * - * @param properties - Set of configuration properties for this graph instance. - */ - protected AbstractLoggingChampGraph(Map<String, Object> properties) { - - // Extract the necessary parameters from the configuration properties. - configure(properties); - - // Make sure we were passed an event producer as one of our properties, otherwise - // there is really nothing more we can do... - if(producer == null) { - logger.error("No event stream producer was supplied."); - logger.error("NOTE!! Champ events will NOT be published to the event stream!"); - return; - } - - // Create the blocking queue that we will use to buffer events that we want - // published to the event bus. - eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity); - - // Create the executor pool that will do the work of publishing events to the event bus. - publisherPool = - (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize, - new ProducerWorkerThreadFactory()); - - try { - - // Start up the producer worker threads. - for(int i=0; i<eventStreamPublisherPoolSize; i++) { - publisherPool.submit(new EventPublisherWorker()); - } - - } catch (Exception e) { - - logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'"); - logger.error("NOTE!! Champ events may NOT be published to the event stream!"); - return; - } - } - - - /** * Process the configuration properties supplied for this graph instance. * * @param properties - Configuration parameters. |