From 55f5c4dc9e130e48a25b048e1f3091b10c17e365 Mon Sep 17 00:00:00 2001 From: Jorge Hernandez Date: Thu, 10 Jan 2019 17:24:53 -0600 Subject: Adding NOOP sources support In addition, Noop* classes have been refactored to increase code reuse and clean some checkstyle issues. Additional Junits have been added for existing functionality. Change-Id: I072f9ff2f415630ac82eca949a8360249f73da86 Issue-ID: POLICY-1397 Signed-off-by: Jorge Hernandez --- .../common/endpoints/event/comm/TopicEndpoint.java | 493 ++------------------- 1 file changed, 37 insertions(+), 456 deletions(-) (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java index f4080b29..52c1f07b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -1,8 +1,8 @@ /* * ============LICENSE_START======================================================= - * policy-endpoints + * ONAP * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,21 +20,16 @@ package org.onap.policy.common.endpoints.event.comm; -import com.fasterxml.jackson.annotation.JsonIgnore; - -import java.util.ArrayList; import java.util.List; import java.util.Properties; - import org.onap.policy.common.capabilities.Lockable; import org.onap.policy.common.capabilities.Startable; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into @@ -45,7 +40,7 @@ public interface TopicEndpoint extends Startable, Lockable { /** * singleton for global access. */ - public static final TopicEndpoint manager = new ProxyTopicEndpointManager(); + TopicEndpoint manager = new TopicEndpointProxy(); /** * Add Topic Sources to the communication infrastructure initialized per properties. @@ -54,7 +49,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @return a generic Topic Source * @throws IllegalArgumentException when invalid arguments are provided */ - public List addTopicSources(Properties properties); + List addTopicSources(Properties properties); /** * Add Topic Sinks to the communication infrastructure initialized per properties. @@ -63,7 +58,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @return a generic Topic Sink * @throws IllegalArgumentException when invalid arguments are provided */ - public List addTopicSinks(Properties properties); + List addTopicSinks(Properties properties); /** * Gets all Topic Sources. @@ -81,7 +76,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @throws IllegalStateException if the entity is in an invalid state * @throws IllegalArgumentException if invalid parameters are present */ - public List getTopicSources(List topicNames); + List getTopicSources(List topicNames); /** * Gets the Topic Source for the given topic name and underlying communication infrastructure @@ -96,7 +91,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @throws IllegalArgumentException if invalid parameters are present * @throws UnsupportedOperationException if the operation is not supported. */ - public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName); + TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName); /** * Get the UEB Topic Source for the given topic name. @@ -108,7 +103,7 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public UebTopicSource getUebTopicSource(String topicName); + UebTopicSource getUebTopicSource(String topicName); /** * Get the DMAAP Topic Source for the given topic name. @@ -120,7 +115,15 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSource getDmaapTopicSource(String topicName); + DmaapTopicSource getDmaapTopicSource(String topicName); + + /** + * Get the Noop Source for the given topic name. + * + * @param topicName the topic name. + * @return the Noop Source. + */ + NoopTopicSource getNoopTopicSource(String topicName); /** * Get the Topic Sinks for the given topic name. @@ -128,7 +131,7 @@ public interface TopicEndpoint extends Startable, Lockable { * @param topicNames the topic names * @return the Topic Sink List */ - public List getTopicSinks(List topicNames); + List getTopicSinks(List topicNames); /** * Get the Topic Sinks for the given topic name and all the underlying communication @@ -141,14 +144,14 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicWriters for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public List getTopicSinks(String topicName); + List getTopicSinks(String topicName); /** * Gets all Topic Sinks. * * @return the Topic Sink List */ - public List getTopicSinks(); + List getTopicSinks(); /** * Get the Topic Sinks for the given topic name and underlying communication infrastructure type. @@ -161,7 +164,7 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicWriters for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName); + TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName); /** * Get the UEB Topic Source for the given topic name. @@ -173,7 +176,7 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public UebTopicSink getUebTopicSink(String topicName); + UebTopicSink getUebTopicSink(String topicName); /** * Get the no-op Topic Sink for the given topic name. @@ -185,7 +188,7 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public NoopTopicSink getNoopTopicSink(String topicName); + NoopTopicSink getNoopTopicSink(String topicName); /** * Get the DMAAP Topic Source for the given topic name. @@ -197,469 +200,47 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSink getDmaapTopicSink(String topicName); + DmaapTopicSink getDmaapTopicSink(String topicName); /** * Gets only the UEB Topic Sources. * * @return the UEB Topic Source List */ - public List getUebTopicSources(); + List getUebTopicSources(); /** * Gets only the DMAAP Topic Sources. * * @return the DMAAP Topic Source List */ - public List getDmaapTopicSources(); + List getDmaapTopicSources(); + + /** + * Gets only the NOOP Topic Sources. + * + * @return the NOOP Topic Source List + */ + List getNoopTopicSources(); /** * Gets only the UEB Topic Sinks. * * @return the UEB Topic Sink List */ - public List getUebTopicSinks(); + List getUebTopicSinks(); /** * Gets only the DMAAP Topic Sinks. * * @return the DMAAP Topic Sink List */ - public List getDmaapTopicSinks(); + List getDmaapTopicSinks(); /** * Gets only the NOOP Topic Sinks. * * @return the NOOP Topic Sinks List */ - public List getNoopTopicSinks(); -} - - -/* - * ----------------- implementation ------------------- - */ - -/** - * This implementation of the Topic Endpoint Manager, proxies operations to appropriate - * implementations according to the communication infrastructure that are supported. - */ -class ProxyTopicEndpointManager implements TopicEndpoint { - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); - /** - * Is this element locked boolean. - */ - protected volatile boolean locked = false; - - /** - * Is this element alive boolean. - */ - protected volatile boolean alive = false; - - @Override - public List addTopicSources(Properties properties) { - - // 1. Create UEB Sources - // 2. Create DMAAP Sources - - final List sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.build(properties)); - sources.addAll(DmaapTopicSource.factory.build(properties)); - - if (this.isLocked()) { - for (final TopicSource source : sources) { - source.lock(); - } - } - - return sources; - } - - @Override - public List addTopicSinks(Properties properties) { - // 1. Create UEB Sinks - // 2. Create DMAAP Sinks - - final List sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.build(properties)); - sinks.addAll(DmaapTopicSink.factory.build(properties)); - sinks.addAll(NoopTopicSink.factory.build(properties)); - - if (this.isLocked()) { - for (final TopicSink sink : sinks) { - sink.lock(); - } - } - - return sinks; - } - - @Override - public List getTopicSources() { - - final List sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.inventory()); - sources.addAll(DmaapTopicSource.factory.inventory()); - - return sources; - } - - @Override - public List getTopicSources(List topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - final List sources = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSource uebSource = this.getUebTopicSource(topic); - if (uebSource != null) { - sources.add(uebSource); - } - } catch (final Exception e) { - logger.debug("No UEB source for topic: {}", topic, e); - } - - try { - final TopicSource dmaapSource = this.getDmaapTopicSource(topic); - if (dmaapSource != null) { - sources.add(dmaapSource); - } - } catch (final Exception e) { - logger.debug("No DMAAP source for topic: {}", topic, e); - } - } - return sources; - } - - @Override - public List getTopicSinks() { - - final List sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.inventory()); - sinks.addAll(DmaapTopicSink.factory.inventory()); - sinks.addAll(NoopTopicSink.factory.inventory()); - - return sinks; - } - - @Override - public List getTopicSinks(List topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - final List sinks = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSink uebSink = this.getUebTopicSink(topic); - if (uebSink != null) { - sinks.add(uebSink); - } - } catch (final Exception e) { - logger.debug("No UEB sink for topic: {}", topic, e); - } - - try { - final TopicSink dmaapSink = this.getDmaapTopicSink(topic); - if (dmaapSink != null) { - sinks.add(dmaapSink); - } - } catch (final Exception e) { - logger.debug("No DMAAP sink for topic: {}", topic, e); - } - - try { - final TopicSink noopSink = this.getNoopTopicSink(topic); - if (noopSink != null) { - sinks.add(noopSink); - } - } catch (final Exception e) { - logger.debug("No NOOP sink for topic: {}", topic, e); - } - } - return sinks; - } - - @Override - public List getTopicSinks(String topicName) { - if (topicName == null) { - throw parmException(topicName); - } - - final List sinks = new ArrayList<>(); - - try { - sinks.add(this.getUebTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getDmaapTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getNoopTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - return sinks; - } - - @JsonIgnore - @Override - public List getUebTopicSources() { - return UebTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List getDmaapTopicSources() { - return DmaapTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List getUebTopicSinks() { - return UebTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List getDmaapTopicSinks() { - return DmaapTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List getNoopTopicSinks() { - return NoopTopicSink.factory.inventory(); - } - - @Override - public boolean start() { - - synchronized (this) { - if (this.locked) { - throw new IllegalStateException(this + " is locked"); - } - - if (this.alive) { - return true; - } - - this.alive = true; - } - - final List endpoints = this.getEndpoints(); - - boolean success = true; - for (final Startable endpoint : endpoints) { - try { - success = endpoint.start() && success; - } catch (final Exception e) { - success = false; - logger.error("Problem starting endpoint: {}", endpoint, e); - } - } - - return success; - } - - - @Override - public boolean stop() { - - /* - * stop regardless if it is locked, in other words, stop operation has precedence over - * locks. - */ - synchronized (this) { - this.alive = false; - } - - final List endpoints = this.getEndpoints(); - - boolean success = true; - for (final Startable endpoint : endpoints) { - try { - success = endpoint.stop() && success; - } catch (final Exception e) { - success = false; - logger.error("Problem stopping endpoint: {}", endpoint, e); - } - } - - return success; - } - - /** - * Gets the endpoints. - * - * @return list of managed endpoints - */ - @JsonIgnore - protected List getEndpoints() { - final List endpoints = new ArrayList<>(); - - endpoints.addAll(this.getTopicSources()); - endpoints.addAll(this.getTopicSinks()); - - return endpoints; - } - - @Override - public void shutdown() { - UebTopicSource.factory.destroy(); - UebTopicSink.factory.destroy(); - NoopTopicSink.factory.destroy(); - - DmaapTopicSource.factory.destroy(); - DmaapTopicSink.factory.destroy(); - } - - @Override - public boolean isAlive() { - return this.alive; - } - - @Override - public boolean lock() { - - synchronized (this) { - if (this.locked) { - return true; - } - - this.locked = true; - } - - for (final TopicSource source : this.getTopicSources()) { - source.lock(); - } - - for (final TopicSink sink : this.getTopicSinks()) { - sink.lock(); - } - - return true; - } - - @Override - public boolean unlock() { - synchronized (this) { - if (!this.locked) { - return true; - } - - this.locked = false; - } - - for (final TopicSource source : this.getTopicSources()) { - source.unlock(); - } - - for (final TopicSink sink : this.getTopicSinks()) { - sink.unlock(); - } - - return true; - } - - @Override - public boolean isLocked() { - return this.locked; - } - - @Override - public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) { - - if (commType == null) { - throw parmException(topicName); - } - - if (topicName == null) { - throw parmException(topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSource(topicName); - case DMAAP: - return this.getDmaapTopicSource(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - private IllegalArgumentException parmException(String topicName) { - return new IllegalArgumentException( - "Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - @Override - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) { - if (commType == null) { - throw parmException(topicName); - } - - if (topicName == null) { - throw parmException(topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSink(topicName); - case DMAAP: - return this.getDmaapTopicSink(topicName); - case NOOP: - return this.getNoopTopicSink(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - private void logNoSink(String topicName, Exception ex) { - logger.debug("No sink for topic: {}", topicName, ex); - } - - @Override - public UebTopicSource getUebTopicSource(String topicName) { - return UebTopicSource.factory.get(topicName); - } - - @Override - public UebTopicSink getUebTopicSink(String topicName) { - return UebTopicSink.factory.get(topicName); - } - - @Override - public DmaapTopicSource getDmaapTopicSource(String topicName) { - return DmaapTopicSource.factory.get(topicName); - } - - @Override - public DmaapTopicSink getDmaapTopicSink(String topicName) { - return DmaapTopicSink.factory.get(topicName); - } - - @Override - public NoopTopicSink getNoopTopicSink(String topicName) { - return NoopTopicSink.factory.get(topicName); - } - + List getNoopTopicSinks(); } -- cgit 1.2.3-korg