diff options
Diffstat (limited to 'policy-endpoints')
4 files changed, 1374 insertions, 1364 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java index cc3705ee..09ee9a4e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -29,214 +29,217 @@ import org.onap.policy.drools.event.comm.bus.DmaapTopicSource; import org.onap.policy.drools.event.comm.bus.NoopTopicSink; import org.onap.policy.drools.event.comm.bus.UebTopicSink; import org.onap.policy.drools.event.comm.bus.UebTopicSource; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; import org.onap.policy.drools.properties.Lockable; import org.onap.policy.drools.properties.Startable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.fasterxml.jackson.annotation.JsonIgnore; /** - * Abstraction to managed the system's Networked Topic Endpoints, - * sources of all events input into the System. + * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into + * the System. */ public interface TopicEndpoint extends Startable, Lockable { - - /** - * Add Topic Sources to the communication infrastructure initialized per - * properties - * - * @param properties properties for Topic Source construction - * @return a generic Topic Source - * @throws IllegalArgumentException when invalid arguments are provided - */ - public List<TopicSource> addTopicSources(Properties properties); - - /** - * Add Topic Sinks to the communication infrastructure initialized per - * properties - * - * @param properties properties for Topic Sink construction - * @return a generic Topic Sink - * @throws IllegalArgumentException when invalid arguments are provided - */ - public List<TopicSink> addTopicSinks(Properties properties); - - /** - * gets all Topic Sources - * @return the Topic Source List - */ - List<TopicSource> getTopicSources(); - - /** - * get the Topic Sources for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source List - * @throws IllegalStateException if the entity is in an invalid state - * @throws IllegalArgumentException if invalid parameters are present - */ - public List<TopicSource> getTopicSources(List<String> topicNames); - - /** - * gets the Topic Source for the given topic name and - * underlying communication infrastructure type - * - * @param commType communication infrastructure type - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - * @throws UnsupportedOperationException if the operation is not supported. - */ - public TopicSource getTopicSource(Topic.CommInfrastructure commType, - String topicName) - throws UnsupportedOperationException; - - /** - * get the UEB Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the UEB Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSource getUebTopicSource(String topicName); - - /** - * get the DMAAP Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the DMAAP Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSource getDmaapTopicSource(String topicName); - - /** - * get the Topic Sinks for the given topic name - * - * @param topicNames the topic names - * @return the Topic Sink List - * @throws IllegalStateException - * @throws IllegalArgumentException - */ - public List<TopicSink> getTopicSinks(List<String> topicNames); - - /** - * get the Topic Sinks for the given topic name and - * underlying communication infrastructure type - * - * @param topicName the topic name - * @param commType communication infrastructure type - * - * @return the Topic Sink List - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicWriters for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public TopicSink getTopicSink(Topic.CommInfrastructure commType, - String topicName) - throws UnsupportedOperationException; - - /** - * get the Topic Sinks for the given topic name and - * all the underlying communication infrastructure type - * - * @param topicName the topic name - * @param commType communication infrastructure type - * - * @return the Topic Sink List - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicWriters for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public List<TopicSink> getTopicSinks(String topicName); - - /** - * get the UEB Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSink getUebTopicSink(String topicName); - - /** - * get the no-op Topic Sink for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public NoopTopicSink getNoopTopicSink(String topicName); - - /** - * get the DMAAP Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSink getDmaapTopicSink(String topicName); - - /** - * gets only the UEB Topic Sources - * @return the UEB Topic Source List - */ - public List<UebTopicSource> getUebTopicSources(); - - /** - * gets only the DMAAP Topic Sources - * @return the DMAAP Topic Source List - */ - public List<DmaapTopicSource> getDmaapTopicSources(); - - /** - * gets all Topic Sinks - * @return the Topic Sink List - */ - public List<TopicSink> getTopicSinks(); - - /** - * gets only the UEB Topic Sinks - * @return the UEB Topic Sink List - */ - public List<UebTopicSink> getUebTopicSinks(); - - /** - * gets only the DMAAP Topic Sinks - * @return the DMAAP Topic Sink List - */ - public List<DmaapTopicSink> getDmaapTopicSinks(); - - /** - * gets only the NOOP Topic Sinks - * @return the NOOP Topic Sinks List - */ - public List<NoopTopicSink> getNoopTopicSinks(); - - /** - * singleton for global access - */ - public static final TopicEndpoint manager = new ProxyTopicEndpointManager(); + + /** + * Add Topic Sources to the communication infrastructure initialized per properties + * + * @param properties properties for Topic Source construction + * @return a generic Topic Source + * @throws IllegalArgumentException when invalid arguments are provided + */ + public List<TopicSource> addTopicSources(Properties properties); + + /** + * Add Topic Sinks to the communication infrastructure initialized per properties + * + * @param properties properties for Topic Sink construction + * @return a generic Topic Sink + * @throws IllegalArgumentException when invalid arguments are provided + */ + public List<TopicSink> addTopicSinks(Properties properties); + + /** + * gets all Topic Sources + * + * @return the Topic Source List + */ + List<TopicSource> getTopicSources(); + + /** + * get the Topic Sources for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source List + * @throws IllegalStateException if the entity is in an invalid state + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<TopicSource> getTopicSources(List<String> topicNames); + + /** + * gets the Topic Source for the given topic name and underlying communication infrastructure type + * + * @param commType communication infrastructure type + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + * @throws UnsupportedOperationException if the operation is not supported. + */ + public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) + throws UnsupportedOperationException; + + /** + * get the UEB Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the UEB Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSource getUebTopicSource(String topicName); + + /** + * get the DMAAP Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the DMAAP Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSource getDmaapTopicSource(String topicName); + + /** + * get the Topic Sinks for the given topic name + * + * @param topicNames the topic names + * @return the Topic Sink List + * @throws IllegalStateException + * @throws IllegalArgumentException + */ + public List<TopicSink> getTopicSinks(List<String> topicNames); + + /** + * get the Topic Sinks for the given topic name and underlying communication infrastructure type + * + * @param topicName the topic name + * @param commType communication infrastructure type + * + * @return the Topic Sink List + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicWriters for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) + throws UnsupportedOperationException; + + /** + * get the Topic Sinks for the given topic name and all the underlying communication + * infrastructure type + * + * @param topicName the topic name + * @param commType communication infrastructure type + * + * @return the Topic Sink List + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicWriters for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<TopicSink> getTopicSinks(String topicName); + + /** + * get the UEB Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSink getUebTopicSink(String topicName); + + /** + * get the no-op Topic Sink for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public NoopTopicSink getNoopTopicSink(String topicName); + + /** + * get the DMAAP Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSink getDmaapTopicSink(String topicName); + + /** + * gets only the UEB Topic Sources + * + * @return the UEB Topic Source List + */ + public List<UebTopicSource> getUebTopicSources(); + + /** + * gets only the DMAAP Topic Sources + * + * @return the DMAAP Topic Source List + */ + public List<DmaapTopicSource> getDmaapTopicSources(); + + /** + * gets all Topic Sinks + * + * @return the Topic Sink List + */ + public List<TopicSink> getTopicSinks(); + + /** + * gets only the UEB Topic Sinks + * + * @return the UEB Topic Sink List + */ + public List<UebTopicSink> getUebTopicSinks(); + + /** + * gets only the DMAAP Topic Sinks + * + * @return the DMAAP Topic Sink List + */ + public List<DmaapTopicSink> getDmaapTopicSinks(); + + /** + * gets only the NOOP Topic Sinks + * + * @return the NOOP Topic Sinks List + */ + public List<NoopTopicSink> getNoopTopicSinks(); + + /** + * singleton for global access + */ + public static final TopicEndpoint manager = new ProxyTopicEndpointManager(); } + /* * ----------------- implementation ------------------- */ @@ -246,398 +249,412 @@ public interface TopicEndpoint extends Startable, Lockable { * implementations according to the communication infrastructure that are supported */ class ProxyTopicEndpointManager implements TopicEndpoint { - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); - /** - * Is this element locked? - */ - protected volatile boolean locked = false; - - /** - * Is this element alive? - */ - protected volatile boolean alive = false; - - @Override - public List<TopicSource> addTopicSources(Properties properties) { - - // 1. Create UEB Sources - // 2. Create DMAAP Sources - - List<TopicSource> sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.build(properties)); - sources.addAll(DmaapTopicSource.factory.build(properties)); - - if (this.isLocked()) { - for (TopicSource source : sources) { - source.lock(); - } - } - - return sources; - } - - @Override - public List<TopicSink> addTopicSinks(Properties properties) { - // 1. Create UEB Sinks - // 2. Create DMAAP Sinks - - List<TopicSink> sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.build(properties)); - sinks.addAll(DmaapTopicSink.factory.build(properties)); - sinks.addAll(NoopTopicSink.factory.build(properties)); - - if (this.isLocked()) { - for (TopicSink sink : sinks) { - sink.lock(); - } - } - - return sinks; - } - - @Override - public List<TopicSource> getTopicSources() { - - List<TopicSource> sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.inventory()); - sources.addAll(DmaapTopicSource.factory.inventory()); - - return sources; - } - - @Override - public List<TopicSink> getTopicSinks() { - - List<TopicSink> sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.inventory()); - sinks.addAll(DmaapTopicSink.factory.inventory()); - sinks.addAll(NoopTopicSink.factory.inventory()); - - return sinks; - } - - @JsonIgnore - @Override - public List<UebTopicSource> getUebTopicSources() { - return UebTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List<DmaapTopicSource> getDmaapTopicSources() { - return DmaapTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List<UebTopicSink> getUebTopicSinks() { - return UebTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List<DmaapTopicSink> getDmaapTopicSinks() { - return DmaapTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List<NoopTopicSink> getNoopTopicSinks() { - return NoopTopicSink.factory.inventory(); - } - - @Override - public boolean start() { - - synchronized (this) { - if (this.locked) { - throw new IllegalStateException(this + " is locked"); - } - - if (this.alive) { - return true; - } - - this.alive = true; - } - - List<Startable> endpoints = getEndpoints(); - - boolean success = true; - for (Startable endpoint: endpoints) { - try { - success = endpoint.start() && success; - } catch (Exception e) { - success = false; - logger.error("Problem starting endpoint: {}", endpoint, e); - } - } - - return success; - } - - - @Override - public boolean stop() { - - /* - * stop regardless if it is locked, in other - * words, stop operation has precedence over - * locks. - */ - synchronized (this) { - this.alive = false; - } - - List<Startable> endpoints = getEndpoints(); - - boolean success = true; - for (Startable endpoint: endpoints) { - try { - success = endpoint.stop() && success; - } catch (Exception e) { - success = false; - logger.error("Problem stopping endpoint: {}", endpoint, e); - } - } - - return success; - } - - /** - * - * @return list of managed endpoints - */ - @JsonIgnore - protected List<Startable> getEndpoints() { - List<Startable> endpoints = new ArrayList<>(); - - endpoints.addAll(this.getTopicSources()); - endpoints.addAll(this.getTopicSinks()); - - return endpoints; - } - - @Override - public void shutdown() { - UebTopicSource.factory.destroy(); - UebTopicSink.factory.destroy(); - - DmaapTopicSource.factory.destroy(); - DmaapTopicSink.factory.destroy(); - } - - @Override - public boolean isAlive() { - return this.alive; - } - - @Override - public boolean lock() { - - synchronized (this) { - if (locked) - return true; - - this.locked = true; - } - - for (TopicSource source: this.getTopicSources()) { - source.lock(); - } - - for (TopicSink sink: this.getTopicSinks()) { - sink.lock(); - } - - return true; - } - - @Override - public boolean unlock() { - synchronized (this) { - if (!locked) - return true; - - this.locked = false; - } - - for (TopicSource source: this.getTopicSources()) { - source.unlock(); - } - - for (TopicSink sink: this.getTopicSinks()) { - sink.unlock(); - } - - return true; - } - - @Override - public boolean isLocked() { - return this.locked; - } - - @Override - public List<TopicSource> getTopicSources(List<String> topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - List<TopicSource> sources = new ArrayList<>(); - for (String topic: topicNames) { - try { - TopicSource uebSource = this.getUebTopicSource(topic); - if (uebSource != null) - sources.add(uebSource); - } catch (Exception e) { - logger.info("No UEB source for topic: {}", topic, e); - } - - try { - TopicSource dmaapSource = this.getDmaapTopicSource(topic); - if (dmaapSource != null) - sources.add(dmaapSource); - } catch (Exception e) { - logger.info("No DMAAP source for topic: {}", topic, e); - } - } - return sources; - } - - @Override - public List<TopicSink> getTopicSinks(List<String> topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - List<TopicSink> sinks = new ArrayList<>(); - for (String topic: topicNames) { - try { - TopicSink uebSink = this.getUebTopicSink(topic); - if (uebSink != null) - sinks.add(uebSink); - } catch (Exception e) { - logger.info("No UEB sink for topic: {}", topic, e); - } - - try { - TopicSink dmaapSink = this.getDmaapTopicSink(topic); - if (dmaapSink != null) - sinks.add(dmaapSink); - } catch (Exception e) { - logger.info("No DMAAP sink for topic: {}", topic, e); - } - } - return sinks; - } - - @Override - public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) - throws UnsupportedOperationException { - - if (commType == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - if (topicName == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSource(topicName); - case DMAAP: - return this.getDmaapTopicSource(topicName); - case REST: - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - @Override - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) - throws UnsupportedOperationException { - if (commType == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - if (topicName == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSink(topicName); - case DMAAP: - return this.getDmaapTopicSink(topicName); - case REST: - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - @Override - public List<TopicSink> getTopicSinks(String topicName) { - - if (topicName == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - List<TopicSink> sinks = new ArrayList<>(); - - try { - sinks.add(this.getUebTopicSink(topicName)); - } catch (Exception e) { - logger.debug("No sink for topic: {}", topicName, e); - } - - try { - sinks.add(this.getDmaapTopicSink(topicName)); - } catch (Exception e) { - logger.debug("No sink for topic: {}", topicName, e); - } - - return sinks; - } - - @Override - public UebTopicSource getUebTopicSource(String topicName) { - return UebTopicSource.factory.get(topicName); - } - - @Override - public UebTopicSink getUebTopicSink(String topicName) { - return UebTopicSink.factory.get(topicName); - } - - @Override - public DmaapTopicSource getDmaapTopicSource(String topicName) { - return DmaapTopicSource.factory.get(topicName); - } - - @Override - public DmaapTopicSink getDmaapTopicSink(String topicName) { - return DmaapTopicSink.factory.get(topicName); - } - - @Override - public NoopTopicSink getNoopTopicSink(String topicName) { - return NoopTopicSink.factory.get(topicName); - } - + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); + /** + * Is this element locked? + */ + protected volatile boolean locked = false; + + /** + * Is this element alive? + */ + protected volatile boolean alive = false; + + @Override + public List<TopicSource> addTopicSources(Properties properties) { + + // 1. Create UEB Sources + // 2. Create DMAAP Sources + + final List<TopicSource> sources = new ArrayList<>(); + + sources.addAll(UebTopicSource.factory.build(properties)); + sources.addAll(DmaapTopicSource.factory.build(properties)); + + if (this.isLocked()) { + for (final TopicSource source : sources) { + source.lock(); + } + } + + return sources; + } + + @Override + public List<TopicSink> addTopicSinks(Properties properties) { + // 1. Create UEB Sinks + // 2. Create DMAAP Sinks + + final List<TopicSink> sinks = new ArrayList<>(); + + sinks.addAll(UebTopicSink.factory.build(properties)); + sinks.addAll(DmaapTopicSink.factory.build(properties)); + sinks.addAll(NoopTopicSink.factory.build(properties)); + + if (this.isLocked()) { + for (final TopicSink sink : sinks) { + sink.lock(); + } + } + + return sinks; + } + + @Override + public List<TopicSource> getTopicSources() { + + final List<TopicSource> sources = new ArrayList<>(); + + sources.addAll(UebTopicSource.factory.inventory()); + sources.addAll(DmaapTopicSource.factory.inventory()); + + return sources; + } + + @Override + public List<TopicSink> getTopicSinks() { + + final List<TopicSink> sinks = new ArrayList<>(); + + sinks.addAll(UebTopicSink.factory.inventory()); + sinks.addAll(DmaapTopicSink.factory.inventory()); + sinks.addAll(NoopTopicSink.factory.inventory()); + + return sinks; + } + + @JsonIgnore + @Override + public List<UebTopicSource> getUebTopicSources() { + return UebTopicSource.factory.inventory(); + } + + @JsonIgnore + @Override + public List<DmaapTopicSource> getDmaapTopicSources() { + return DmaapTopicSource.factory.inventory(); + } + + @JsonIgnore + @Override + public List<UebTopicSink> getUebTopicSinks() { + return UebTopicSink.factory.inventory(); + } + + @JsonIgnore + @Override + public List<DmaapTopicSink> getDmaapTopicSinks() { + return DmaapTopicSink.factory.inventory(); + } + + @JsonIgnore + @Override + public List<NoopTopicSink> getNoopTopicSinks() { + return NoopTopicSink.factory.inventory(); + } + + @Override + public boolean start() { + + synchronized (this) { + if (this.locked) { + throw new IllegalStateException(this + " is locked"); + } + + if (this.alive) { + return true; + } + + this.alive = true; + } + + final List<Startable> endpoints = this.getEndpoints(); + + boolean success = true; + for (final Startable endpoint : endpoints) { + try { + success = endpoint.start() && success; + } catch (final Exception e) { + success = false; + logger.error("Problem starting endpoint: {}", endpoint, e); + } + } + + return success; + } + + + @Override + public boolean stop() { + + /* + * stop regardless if it is locked, in other words, stop operation has precedence over locks. + */ + synchronized (this) { + this.alive = false; + } + + final List<Startable> endpoints = this.getEndpoints(); + + boolean success = true; + for (final Startable endpoint : endpoints) { + try { + success = endpoint.stop() && success; + } catch (final Exception e) { + success = false; + logger.error("Problem stopping endpoint: {}", endpoint, e); + } + } + + return success; + } + + /** + * + * @return list of managed endpoints + */ + @JsonIgnore + protected List<Startable> getEndpoints() { + final List<Startable> endpoints = new ArrayList<>(); + + endpoints.addAll(this.getTopicSources()); + endpoints.addAll(this.getTopicSinks()); + + return endpoints; + } + + @Override + public void shutdown() { + UebTopicSource.factory.destroy(); + UebTopicSink.factory.destroy(); + NoopTopicSink.factory.destroy(); + + DmaapTopicSource.factory.destroy(); + DmaapTopicSink.factory.destroy(); + } + + @Override + public boolean isAlive() { + return this.alive; + } + + @Override + public boolean lock() { + + synchronized (this) { + if (this.locked) + return true; + + this.locked = true; + } + + for (final TopicSource source : this.getTopicSources()) { + source.lock(); + } + + for (final TopicSink sink : this.getTopicSinks()) { + sink.lock(); + } + + return true; + } + + @Override + public boolean unlock() { + synchronized (this) { + if (!this.locked) + return true; + + this.locked = false; + } + + for (final TopicSource source : this.getTopicSources()) { + source.unlock(); + } + + for (final TopicSink sink : this.getTopicSinks()) { + sink.unlock(); + } + + return true; + } + + @Override + public boolean isLocked() { + return this.locked; + } + + @Override + public List<TopicSource> getTopicSources(List<String> topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List<TopicSource> sources = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSource uebSource = this.getUebTopicSource(topic); + if (uebSource != null) + sources.add(uebSource); + } catch (final Exception e) { + logger.debug("No UEB source for topic: {}", topic, e); + } + + try { + final TopicSource dmaapSource = this.getDmaapTopicSource(topic); + if (dmaapSource != null) + sources.add(dmaapSource); + } catch (final Exception e) { + logger.debug("No DMAAP source for topic: {}", topic, e); + } + } + return sources; + } + + @Override + public List<TopicSink> getTopicSinks(List<String> topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List<TopicSink> sinks = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSink uebSink = this.getUebTopicSink(topic); + if (uebSink != null) + sinks.add(uebSink); + } catch (final Exception e) { + logger.debug("No UEB sink for topic: {}", topic, e); + } + + try { + final TopicSink dmaapSink = this.getDmaapTopicSink(topic); + if (dmaapSink != null) + sinks.add(dmaapSink); + } catch (final Exception e) { + logger.debug("No DMAAP sink for topic: {}", topic, e); + } + + try { + final TopicSink noopSink = this.getNoopTopicSink(topic); + if (noopSink != null) + sinks.add(noopSink); + } catch (final Exception e) { + logger.debug("No NOOP sink for topic: {}", topic, e); + } + } + return sinks; + } + + @Override + public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) + throws UnsupportedOperationException { + + if (commType == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + if (topicName == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + switch (commType) { + case UEB: + return this.getUebTopicSource(topicName); + case DMAAP: + return this.getDmaapTopicSource(topicName); + case REST: + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + @Override + public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) + throws UnsupportedOperationException { + if (commType == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + if (topicName == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + switch (commType) { + case UEB: + return this.getUebTopicSink(topicName); + case DMAAP: + return this.getDmaapTopicSink(topicName); + case NOOP: + return this.getNoopTopicSink(topicName); + case REST: + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + @Override + public List<TopicSink> getTopicSinks(String topicName) { + if (topicName == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + final List<TopicSink> sinks = new ArrayList<>(); + + try { + sinks.add(this.getUebTopicSink(topicName)); + } catch (final Exception e) { + logger.debug("No sink for topic: {}", topicName, e); + } + + try { + sinks.add(this.getDmaapTopicSink(topicName)); + } catch (final Exception e) { + logger.debug("No sink for topic: {}", topicName, e); + } + + try { + sinks.add(this.getNoopTopicSink(topicName)); + } catch (final Exception e) { + logger.debug("No sink for topic: {}", topicName, e); + } + + return sinks; + } + + @Override + public UebTopicSource getUebTopicSource(String topicName) { + return UebTopicSource.factory.get(topicName); + } + + @Override + public UebTopicSink getUebTopicSink(String topicName) { + return UebTopicSink.factory.get(topicName); + } + + @Override + public DmaapTopicSource getDmaapTopicSource(String topicName) { + return DmaapTopicSource.factory.get(topicName); + } + + @Override + public DmaapTopicSink getDmaapTopicSink(String topicName) { + return DmaapTopicSink.factory.get(topicName); + } + + @Override + public NoopTopicSink getNoopTopicSink(String topicName) { + return NoopTopicSink.factory.get(topicName); + } + } diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java index 946e48c0..53315391 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,191 +33,194 @@ import org.slf4j.LoggerFactory; * Noop Topic Sink Factory */ public interface NoopTopicSinkFactory { - - /** - * Creates noop topic sinks based on properties files - * - * @param properties Properties containing initialization values - * - * @return a noop topic sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public List<NoopTopicSink> build(Properties properties); - - /** - * builds a noop sink - * - * @param servers list of servers - * @param topic topic name - * @param managed is this sink endpoint managed? - * @return a noop topic sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public NoopTopicSink build(List<String> servers, String topic, boolean managed); - - /** - * Destroys a sink based on the topic - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - public void destroy(String topic); - - /** - * gets a sink based on topic name - * @param topic the topic name - * - * @return a sink with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the sink is in an incorrect state - */ - public NoopTopicSink get(String topic); - - /** - * Provides a snapshot of the UEB Topic Writers - * @return a list of the UEB Topic Writers - */ - public List<NoopTopicSink> inventory(); - - /** - * Destroys all sinks - */ - public void destroy(); + + /** + * Creates noop topic sinks based on properties files + * + * @param properties Properties containing initialization values + * + * @return a noop topic sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<NoopTopicSink> build(Properties properties); + + /** + * builds a noop sink + * + * @param servers list of servers + * @param topic topic name + * @param managed is this sink endpoint managed? + * @return a noop topic sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public NoopTopicSink build(List<String> servers, String topic, boolean managed); + + /** + * Destroys a sink based on the topic + * + * @param topic topic name + * @throws IllegalArgumentException if invalid parameters are present + */ + public void destroy(String topic); + + /** + * gets a sink based on topic name + * + * @param topic the topic name + * + * @return a sink with topic name + * @throws IllegalArgumentException if an invalid topic is provided + * @throws IllegalStateException if the sink is in an incorrect state + */ + public NoopTopicSink get(String topic); + + /** + * Provides a snapshot of the UEB Topic Writers + * + * @return a list of the UEB Topic Writers + */ + public List<NoopTopicSink> inventory(); + + /** + * Destroys all sinks + */ + public void destroy(); } + /* ------------- implementation ----------------- */ /** * Factory of noop sinks */ class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory { - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); - - /** - * noop topic sinks map - */ - protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>(); - - @Override - public List<NoopTopicSink> build(Properties properties) { - - String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS); - if (sinkTopics == null || sinkTopics.isEmpty()) { - logger.info("{}: no topic for noop sink", this); - return new ArrayList<>(); - } - - List<String> sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*"))); - List<NoopTopicSink> newSinks = new ArrayList<NoopTopicSink>(); - synchronized(this) { - for (String topic: sinkTopicList) { - if (this.noopTopicSinks.containsKey(topic)) { - newSinks.add(this.noopTopicSinks.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + - topic + - PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - if (servers == null || servers.isEmpty()) - servers = "noop"; - - List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + - PolicyProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - NoopTopicSink noopSink = this.build(serverList, topic, managed); - newSinks.add(noopSink); - } - return newSinks; - } - } - - @Override - public NoopTopicSink build(List<String> servers, String topic, boolean managed) { - if (servers == null) { - servers = new ArrayList<>(); - } - - if (servers.isEmpty()) { - servers.add("noop"); - } - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("A topic must be provided"); - } - - synchronized (this) { - if (noopTopicSinks.containsKey(topic)) { - return noopTopicSinks.get(topic); - } - - NoopTopicSink sink = - new NoopTopicSink(servers, topic); - - if (managed) - noopTopicSinks.put(topic, sink); - - return sink; - } - } - - @Override - public void destroy(String topic) { - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("A topic must be provided"); - } - - NoopTopicSink noopSink; - synchronized(this) { - if (!noopTopicSinks.containsKey(topic)) { - return; - } - - noopSink = noopTopicSinks.remove(topic); - } - - noopSink.shutdown(); - } - - @Override - public void destroy() { - List<NoopTopicSink> sinks = this.inventory(); - for (NoopTopicSink sink: sinks) { - sink.shutdown(); - } - - synchronized(this) { - this.noopTopicSinks.clear(); - } - } - - @Override - public NoopTopicSink get(String topic) { - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("A topic must be provided"); - } - - synchronized(this) { - if (noopTopicSinks.containsKey(topic)) { - return noopTopicSinks.get(topic); - } else { - throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); - } - } - } - - @Override - public List<NoopTopicSink> inventory() { - return new ArrayList<>(this.noopTopicSinks.values()); - } + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); + + /** + * noop topic sinks map + */ + protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>(); + + @Override + public List<NoopTopicSink> build(Properties properties) { + + final String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS); + if (sinkTopics == null || sinkTopics.isEmpty()) { + logger.info("{}: no topic for noop sink", this); + return new ArrayList<>(); + } + + final List<String> sinkTopicList = + new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*"))); + final List<NoopTopicSink> newSinks = new ArrayList<NoopTopicSink>(); + synchronized (this) { + for (final String topic : sinkTopicList) { + if (this.noopTopicSinks.containsKey(topic)) { + newSinks.add(this.noopTopicSinks.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + if (servers == null || servers.isEmpty()) + servers = "noop"; + + final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + + final String managedString = + properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + final NoopTopicSink noopSink = this.build(serverList, topic, managed); + newSinks.add(noopSink); + } + return newSinks; + } + } + + @Override + public NoopTopicSink build(List<String> servers, String topic, boolean managed) { + if (servers == null) { + servers = new ArrayList<>(); + } + + if (servers.isEmpty()) { + servers.add("noop"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized (this) { + if (this.noopTopicSinks.containsKey(topic)) { + return this.noopTopicSinks.get(topic); + } + + final NoopTopicSink sink = new NoopTopicSink(servers, topic); + + if (managed) + this.noopTopicSinks.put(topic, sink); + + return sink; + } + } + + @Override + public void destroy(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + NoopTopicSink noopSink; + synchronized (this) { + if (!this.noopTopicSinks.containsKey(topic)) { + return; + } + + noopSink = this.noopTopicSinks.remove(topic); + } + + noopSink.shutdown(); + } + + @Override + public void destroy() { + final List<NoopTopicSink> sinks = this.inventory(); + for (final NoopTopicSink sink : sinks) { + sink.shutdown(); + } + + synchronized (this) { + this.noopTopicSinks.clear(); + } + } + + @Override + public NoopTopicSink get(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized (this) { + if (this.noopTopicSinks.containsKey(topic)) { + return this.noopTopicSinks.get(topic); + } else { + throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); + } + } + } + + @Override + public List<NoopTopicSink> inventory() { + return new ArrayList<>(this.noopTopicSinks.values()); + } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java index edb03bba..8171c35d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,427 +34,432 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; import com.att.nsa.cambria.client.CambriaConsumer; import com.att.nsa.mr.client.MRClientFactory; import com.att.nsa.mr.client.impl.MRConsumerImpl; import com.att.nsa.mr.client.response.MRConsumerResponse; import com.att.nsa.mr.test.clients.ProtocolTypeConstants; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; /** * Wrapper around libraries to consume from message bus * */ public interface BusConsumer { - - /** - * fetch messages - * - * @return list of messages - * @throws Exception when error encountered by underlying libraries - */ - public Iterable<String> fetch() throws InterruptedException, IOException; - - /** - * close underlying library consumer - */ - public void close(); - - /** - * Cambria based consumer - */ - public static class CambriaConsumerWrapper implements BusConsumer { - - /** - * Cambria client - */ - protected CambriaConsumer consumer; - - /** - * Cambria Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws GeneralSecurityException - * @throws MalformedURLException - */ - public CambriaConsumerWrapper(List<String> servers, String topic, - String apiKey, String apiSecret, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) - throws IllegalArgumentException { - - ConsumerBuilder builder = - new CambriaClientBuilders.ConsumerBuilder(); - - - if (useHttps){ - - if(useSelfSignedCerts){ - builder.knownAs(consumerGroup, consumerInstance) - .usingHosts(servers) - .onTopic(topic) - .waitAtServer(fetchTimeout) - .receivingAtMost(fetchLimit) - .usingHttps() - .allowSelfSignedCertificates(); - } - else{ - builder.knownAs(consumerGroup, consumerInstance) - .usingHosts(servers) - .onTopic(topic) - .waitAtServer(fetchTimeout) - .receivingAtMost(fetchLimit) - .usingHttps(); - } - } - else{ - builder.knownAs(consumerGroup, consumerInstance) - .usingHosts(servers) - .onTopic(topic) - .waitAtServer(fetchTimeout) - .receivingAtMost(fetchLimit); - } - - if (apiKey != null && !apiKey.isEmpty() && - apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); - } - - try { - this.consumer = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public Iterable<String> fetch() throws IOException { - return this.consumer.fetch(); - } - - @Override - public void close() { - this.consumer.close(); - } - - @Override - public String toString() { - return "CambriaConsumerWrapper []"; - } - } - - /** - * MR based consumer - */ - public abstract class DmaapConsumerWrapper implements BusConsumer { - - /** - * logger - */ - private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); - - /** - * fetch timeout - */ - protected int fetchTimeout; - - /** - * close condition - */ - protected Object closeCondition = new Object(); - - /** - * MR Consumer - */ - protected MRConsumerImpl consumer; - - /** - * MR Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param username AAF Login - * @param password AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws MalformedURLException - */ - public DmaapConsumerWrapper(List<String> servers, String topic, - String apiKey, String apiSecret, - String username, String password, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean useHttps) - throws MalformedURLException { - - this.fetchTimeout = fetchTimeout; - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("No topic for DMaaP"); - } - - this.consumer = new MRConsumerImpl(servers, topic, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, - null, apiKey, apiSecret); - - this.consumer.setUsername(username); - this.consumer.setPassword(password); - } - - @Override - public Iterable<String> fetch() throws InterruptedException, IOException { - MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); - if (response == null) { - logger.warn("{}: DMaaP NULL response received", this); - - synchronized (closeCondition) { - closeCondition.wait(fetchTimeout); - } - return new ArrayList<>(); - } else { - logger.debug("DMaaP consumer received {} : {}" + - response.getResponseCode(), - response.getResponseMessage()); - - if (response.getResponseCode() == null || - !response.getResponseCode().equals("200")) { - - logger.error("DMaaP consumer received: {} : {}", - response.getResponseCode(), - response.getResponseMessage()); - - synchronized (closeCondition) { - closeCondition.wait(fetchTimeout); - } - - /* fall through */ - } - } - - if (response.getActualMessages() == null) - return new ArrayList<>(); - else - return response.getActualMessages(); - } - - @Override - public void close() { - synchronized (closeCondition) { - closeCondition.notifyAll(); - } - - this.consumer.close(); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder. - append("DmaapConsumerWrapper ["). - append("consumer.getAuthDate()=").append(consumer.getAuthDate()). - append(", consumer.getAuthKey()=").append(consumer.getAuthKey()). - append(", consumer.getHost()=").append(consumer.getHost()). - append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()). - append(", consumer.getUsername()=").append(consumer.getUsername()). - append("]"); - return builder.toString(); - } - } - - /** - * MR based consumer - */ - public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); - - private Properties props; - - /** - * MR Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param aafLogin AAF Login - * @param aafPassword AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws MalformedURLException - */ - public DmaapAafConsumerWrapper(List<String> servers, String topic, - String apiKey, String apiSecret, - String aafLogin, String aafPassword, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException { - - super(servers, topic, apiKey, apiSecret, - aafLogin, aafPassword, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, useHttps); - - // super constructor sets servers = {""} if empty to avoid errors when using DME2 - if ((servers.size() == 1 && servers.get(0).equals("")) || - (servers == null) || (servers.isEmpty())) { - throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); - } - - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - props = new Properties(); - - if(useHttps){ - props.setProperty("Protocol", "https"); - this.consumer.setHost(servers.get(0) + ":3905"); - - } - else{ - props.setProperty("Protocol", "http"); - this.consumer.setHost(servers.get(0) + ":3904"); - } - - this.consumer.setProps(props); - logger.info("{}: CREATION", this); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - MRConsumerImpl consumer = (MRConsumerImpl) this.consumer; - - builder. - append("DmaapConsumerWrapper ["). - append("consumer.getAuthDate()=").append(consumer.getAuthDate()). - append(", consumer.getAuthKey()=").append(consumer.getAuthKey()). - append(", consumer.getHost()=").append(consumer.getHost()). - append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()). - append(", consumer.getUsername()=").append(consumer.getUsername()). - append("]"); - return builder.toString(); - } - } - - public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); - - private Properties props; - - public DmaapDmeConsumerWrapper(List<String> servers, String topic, - String apiKey, String apiSecret, - String dme2Login, String dme2Password, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, - String environment, String aftEnvironment, String dme2Partner, - String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) throws MalformedURLException { - - - - super(servers, topic, apiKey, apiSecret, - dme2Login, dme2Password, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, useHttps); - - - String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); - - if (environment == null || environment.isEmpty()) { - throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP"); - } if (aftEnvironment == null || aftEnvironment.isEmpty()) { - throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP"); - } if (latitude == null || latitude.isEmpty()) { - throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP"); - } if (longitude == null || longitude.isEmpty()) { - throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP"); - } - - if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { - throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + - PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); - } - - String serviceName = servers.get(0); - - this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - - this.consumer.setUsername(dme2Login); - this.consumer.setPassword(dme2Password); - - props = new Properties(); - - props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); - - props.setProperty("username", dme2Login); - props.setProperty("password", dme2Password); - - /* These are required, no defaults */ - props.setProperty("topic", topic); - - props.setProperty("Environment", environment); - props.setProperty("AFT_ENVIRONMENT", aftEnvironment); - - if (dme2Partner != null) - props.setProperty("Partner", dme2Partner); - if (dme2RouteOffer != null) - props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - - props.setProperty("Latitude", latitude); - props.setProperty("Longitude", longitude); - - /* These are optional, will default to these values if not set in additionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - - /* These should not change */ - props.setProperty("TransportType", "DME2"); - props.setProperty("MethodType", "GET"); - - if(useHttps){ - props.setProperty("Protocol", "https"); - - } - else{ - props.setProperty("Protocol", "http"); - } - - props.setProperty("contenttype", "application/json"); - - if (additionalProps != null) { - for(String key : additionalProps.keySet()) - props.put(key, additionalProps.get(key)); - } - - MRClientFactory.prop = props; - this.consumer.setProps(props); - - logger.info("{}: CREATION", this); - } - } -} + /** + * fetch messages + * + * @return list of messages + * @throws Exception when error encountered by underlying libraries + */ + public Iterable<String> fetch() throws InterruptedException, IOException; + + /** + * close underlying library consumer + */ + public void close(); + + /** + * Cambria based consumer + */ + public static class CambriaConsumerWrapper implements BusConsumer { + + /** + * logger + */ + private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); + + /** + * Cambria client + */ + protected CambriaConsumer consumer; + + /** + * fetch timeout + */ + protected int fetchTimeout; + + /** + * close condition + */ + protected Object closeCondition = new Object(); + + /** + * Cambria Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws GeneralSecurityException + * @throws MalformedURLException + */ + public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, + String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout, + int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) + throws IllegalArgumentException { + + this.fetchTimeout = fetchTimeout; + + final ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder(); + + if (useHttps) { + + if (useSelfSignedCerts) { + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) + .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps() + .allowSelfSignedCertificates(); + } else { + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) + .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps(); + } + } else { + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) + .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); + } + + if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { + builder.authenticatedBy(apiKey, apiSecret); + } + + try { + this.consumer = builder.build(); + } catch (MalformedURLException | GeneralSecurityException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public Iterable<String> fetch() throws IOException, InterruptedException { + try { + return this.consumer.fetch(); + } catch (final IOException e) { + logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), + this.fetchTimeout); + synchronized (this.closeCondition) { + this.closeCondition.wait(this.fetchTimeout); + } + + throw e; + } + } + + @Override + public void close() { + synchronized (closeCondition) { + closeCondition.notifyAll(); + } + + this.consumer.close(); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + builder.append("CambriaConsumerWrapper [fetchTimeout=").append(fetchTimeout).append("]"); + return builder.toString(); + } + } + + /** + * MR based consumer + */ + public abstract class DmaapConsumerWrapper implements BusConsumer { + + /** + * logger + */ + private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); + + /** + * fetch timeout + */ + protected int fetchTimeout; + + /** + * close condition + */ + protected Object closeCondition = new Object(); + + /** + * MR Consumer + */ + protected MRConsumerImpl consumer; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param username AAF Login + * @param password AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws MalformedURLException + */ + public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, + String username, String password, String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException { + + this.fetchTimeout = fetchTimeout; + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("No topic for DMaaP"); + } + + this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, null, apiKey, apiSecret); + + this.consumer.setUsername(username); + this.consumer.setPassword(password); + } + + @Override + public Iterable<String> fetch() throws InterruptedException, IOException { + final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); + if (response == null) { + logger.warn("{}: DMaaP NULL response received", this); + + synchronized (closeCondition) { + closeCondition.wait(fetchTimeout); + } + return new ArrayList<>(); + } else { + logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(), + response.getResponseMessage()); + + if (response.getResponseCode() == null || !response.getResponseCode().equals("200")) { + + logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), + response.getResponseMessage()); + + synchronized (closeCondition) { + closeCondition.wait(fetchTimeout); + } + + /* fall through */ + } + } + + if (response.getActualMessages() == null) + return new ArrayList<>(); + else + return response.getActualMessages(); + } + + @Override + public void close() { + synchronized (closeCondition) { + closeCondition.notifyAll(); + } + + this.consumer.close(); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=") + .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=") + .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost()) + .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()) + .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]"); + return builder.toString(); + } + } + /** + * MR based consumer + */ + public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { + + private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); + + private final Properties props; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param aafLogin AAF Login + * @param aafPassword AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws MalformedURLException + */ + public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey, + String apiSecret, String aafLogin, String aafPassword, String consumerGroup, + String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps) + throws MalformedURLException { + + super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, + consumerInstance, fetchTimeout, fetchLimit, useHttps); + + // super constructor sets servers = {""} if empty to avoid errors when using DME2 + if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) + || (servers.isEmpty())) { + throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); + } + + this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + + props = new Properties(); + + if (useHttps) { + props.setProperty("Protocol", "https"); + this.consumer.setHost(servers.get(0) + ":3905"); + + } else { + props.setProperty("Protocol", "http"); + this.consumer.setHost(servers.get(0) + ":3904"); + } + + this.consumer.setProps(props); + logger.info("{}: CREATION", this); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + final MRConsumerImpl consumer = this.consumer; + + builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=") + .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=") + .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost()) + .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()) + .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]"); + return builder.toString(); + } + } + + public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { + + private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); + + private final Properties props; + + public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey, + String apiSecret, String dme2Login, String dme2Password, String consumerGroup, + String consumerInstance, int fetchTimeout, int fetchLimit, String environment, + String aftEnvironment, String dme2Partner, String latitude, String longitude, + Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException { + + + + super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, + consumerInstance, fetchTimeout, fetchLimit, useHttps); + + + final String dme2RouteOffer = + additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + + if (environment == null || environment.isEmpty()) { + throw new IllegalArgumentException( + "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + + " property for DME2 in DMaaP"); + } + if (aftEnvironment == null || aftEnvironment.isEmpty()) { + throw new IllegalArgumentException( + "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + + " property for DME2 in DMaaP"); + } + if (latitude == null || latitude.isEmpty()) { + throw new IllegalArgumentException("Missing " + + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP"); + } + if (longitude == null || longitude.isEmpty()) { + throw new IllegalArgumentException( + "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + + " property for DME2 in DMaaP"); + } + + if ((dme2Partner == null || dme2Partner.isEmpty()) + && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { + throw new IllegalArgumentException( + "Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + } + + final String serviceName = servers.get(0); + + this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); + + this.consumer.setUsername(dme2Login); + this.consumer.setPassword(dme2Password); + + props = new Properties(); + + props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); + + props.setProperty("username", dme2Login); + props.setProperty("password", dme2Password); + + /* These are required, no defaults */ + props.setProperty("topic", topic); + + props.setProperty("Environment", environment); + props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + + if (dme2Partner != null) + props.setProperty("Partner", dme2Partner); + if (dme2RouteOffer != null) + props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + + props.setProperty("Latitude", latitude); + props.setProperty("Longitude", longitude); + + /* These are optional, will default to these values if not set in additionalProps */ + props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); + props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); + props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); + props.setProperty("Version", "1.0"); + props.setProperty("SubContextPath", "/"); + props.setProperty("sessionstickinessrequired", "no"); + + /* These should not change */ + props.setProperty("TransportType", "DME2"); + props.setProperty("MethodType", "GET"); + + if (useHttps) { + props.setProperty("Protocol", "https"); + + } else { + props.setProperty("Protocol", "http"); + } + + props.setProperty("contenttype", "application/json"); + + if (additionalProps != null) { + for (final String key : additionalProps.keySet()) + props.put(key, additionalProps.get(key)); + } + + MRClientFactory.prop = props; + this.consumer.setProps(props); + + logger.info("{}: CREATION", this); + } + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java index f7ef7bcf..dd9a7c2b 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,6 +22,7 @@ package org.onap.policy.drools.http.server.test; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; import java.util.Properties; @@ -33,186 +34,170 @@ import org.junit.Test; import org.onap.policy.drools.http.client.HttpClient; import org.onap.policy.drools.http.server.HttpServletServer; import org.onap.policy.drools.properties.PolicyProperties; +import org.onap.policy.drools.utils.NetworkUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HttpClientTest { - - private static Logger logger = LoggerFactory.getLogger(HttpClientTest.class); - - @BeforeClass - public static void setUp() throws InterruptedException { - logger.info("-- setup() --"); - - /* echo server */ - - HttpServletServer echoServerNoAuth = HttpServletServer.factory.build("echo", "localhost", 6666, "/", false, true); - echoServerNoAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName()); - echoServerNoAuth.waitedStart(5000); - - /* no auth echo server */ - - HttpServletServer echoServerAuth = HttpServletServer.factory.build("echo", "localhost", 6667, "/", false, true); - echoServerAuth.setBasicAuthentication("x", "y", null); - echoServerAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName()); - echoServerAuth.waitedStart(5000); - } - - @AfterClass - public static void tearDown() { - logger.info("-- tearDown() --"); - - HttpServletServer.factory.destroy(); - HttpClient.factory.destroy(); - } - - @Test - public void testHttpNoAuthClient() throws Exception { - logger.info("-- testHttpNoAuthClient() --"); - - HttpClient client = HttpClient.factory.build("testHttpNoAuthClient", false, false, - "localhost", 6666, "junit/echo", - null, null, true); - Response response = client.get("hello"); - String body = HttpClient.getBody(response, String.class); - - assertTrue(response.getStatus() == 200); - assertTrue(body.equals("hello")); - } - - @Test - public void testHttpAuthClient() throws Exception { - logger.info("-- testHttpAuthClient() --"); - - HttpClient client = HttpClient.factory.build("testHttpAuthClient",false, false, - "localhost", 6667, "junit/echo", - "x", "y", true); - Response response = client.get("hello"); - String body = HttpClient.getBody(response, String.class); - - assertTrue(response.getStatus() == 200); - assertTrue(body.equals("hello")); - } - - @Test - public void testHttpAuthClient401() throws Exception { - logger.info("-- testHttpAuthClient401() --"); - - HttpClient client = HttpClient.factory.build("testHttpAuthClient401",false, false, - "localhost", 6667, "junit/echo", - null, null, true); - Response response = client.get("hello"); - assertTrue(response.getStatus() == 401); - } - - @Test - public void testHttpAuthClientProps() throws Exception { - logger.info("-- testHttpAuthClientProps() --"); - - Properties httpProperties = new Properties(); - - httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES, "PAP,PDP"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, - "localhost"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, - "7777"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, - "testpap"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, - "alpha123"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, - RestMockHealthCheck.class.getName()); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX, - "true"); - - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, - "localhost"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, - "7778"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, - "testpdp"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, - "alpha123"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, - RestMockHealthCheck.class.getName()); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX, - "true"); - - httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES, "PAP,PDP"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, - "localhost"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, - "7777"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX, - "pap/test"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX, - "false"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, - "testpap"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, - "alpha123"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX, - "true"); - - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, - "localhost"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, - "7778"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX, - "pdp"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX, - "false"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, - "testpdp"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, - "alpha123"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX, - "true"); - - ArrayList<HttpServletServer> servers = HttpServletServer.factory.build(httpProperties); - assertTrue(servers.size() == 2); - - ArrayList<HttpClient> clients = HttpClient.factory.build(httpProperties); - assertTrue(clients.size() == 2); - - for (HttpServletServer server: servers) { - server.waitedStart(5000); - } - - HttpClient clientPAP = HttpClient.factory.get("PAP"); - Response response = clientPAP.get(); - assertTrue(response.getStatus() == 200); - - HttpClient clientPDP = HttpClient.factory.get("PDP"); - Response response2 = clientPDP.get("test"); - assertTrue(response2.getStatus() == 500); + + private static Logger logger = LoggerFactory.getLogger(HttpClientTest.class); + + @BeforeClass + public static void setUp() throws InterruptedException, IOException { + logger.info("-- setup() --"); + + /* echo server */ + + final HttpServletServer echoServerNoAuth = + HttpServletServer.factory.build("echo", "localhost", 6666, "/", false, true); + echoServerNoAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName()); + echoServerNoAuth.waitedStart(5000); + + if (!NetworkUtil.isTcpPortOpen("localhost", echoServerNoAuth.getPort(), 5, 10000L)) + throw new IllegalStateException("cannot connect to port " + echoServerNoAuth.getPort()); + + /* no auth echo server */ + + final HttpServletServer echoServerAuth = + HttpServletServer.factory.build("echo", "localhost", 6667, "/", false, true); + echoServerAuth.setBasicAuthentication("x", "y", null); + echoServerAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName()); + echoServerAuth.waitedStart(5000); + + if (!NetworkUtil.isTcpPortOpen("localhost", echoServerAuth.getPort(), 5, 10000L)) + throw new IllegalStateException("cannot connect to port " + echoServerAuth.getPort()); + } + + @AfterClass + public static void tearDown() { + logger.info("-- tearDown() --"); + + HttpServletServer.factory.destroy(); + HttpClient.factory.destroy(); + } + + @Test + public void testHttpNoAuthClient() throws Exception { + logger.info("-- testHttpNoAuthClient() --"); + + final HttpClient client = HttpClient.factory.build("testHttpNoAuthClient", false, false, + "localhost", 6666, "junit/echo", null, null, true); + final Response response = client.get("hello"); + final String body = HttpClient.getBody(response, String.class); + + assertTrue(response.getStatus() == 200); + assertTrue(body.equals("hello")); + } + + @Test + public void testHttpAuthClient() throws Exception { + logger.info("-- testHttpAuthClient() --"); + + final HttpClient client = HttpClient.factory.build("testHttpAuthClient", false, false, + "localhost", 6667, "junit/echo", "x", "y", true); + final Response response = client.get("hello"); + final String body = HttpClient.getBody(response, String.class); + + assertTrue(response.getStatus() == 200); + assertTrue(body.equals("hello")); + } + + @Test + public void testHttpAuthClient401() throws Exception { + logger.info("-- testHttpAuthClient401() --"); + + final HttpClient client = HttpClient.factory.build("testHttpAuthClient401", false, false, + "localhost", 6667, "junit/echo", null, null, true); + final Response response = client.get("hello"); + assertTrue(response.getStatus() == 401); + } + + @Test + public void testHttpAuthClientProps() throws Exception { + logger.info("-- testHttpAuthClientProps() --"); + + final Properties httpProperties = new Properties(); + + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES, "PAP,PDP"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7777"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpap"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123"); + httpProperties.setProperty( + PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, + RestMockHealthCheck.class.getName()); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true"); + + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7778"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpdp"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123"); + httpProperties.setProperty( + PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, + RestMockHealthCheck.class.getName()); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true"); + + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES, "PAP,PDP"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7777"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX, "pap/test"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX, "false"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpap"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true"); + + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7778"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX, "pdp"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX, "false"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpdp"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true"); + + final ArrayList<HttpServletServer> servers = HttpServletServer.factory.build(httpProperties); + assertTrue(servers.size() == 2); + + final ArrayList<HttpClient> clients = HttpClient.factory.build(httpProperties); + assertTrue(clients.size() == 2); + + for (final HttpServletServer server : servers) { + server.waitedStart(10000); } + final HttpClient clientPAP = HttpClient.factory.get("PAP"); + final Response response = clientPAP.get(); + assertTrue(response.getStatus() == 200); + + final HttpClient clientPDP = HttpClient.factory.get("PDP"); + final Response response2 = clientPDP.get("test"); + assertTrue(response2.getStatus() == 500); + } + } |