From 64f53ef14f5a9ea98208fd2b835bfb01fda9a5f9 Mon Sep 17 00:00:00 2001 From: mmis Date: Thu, 19 Jul 2018 13:21:08 +0100 Subject: Copy policy-endpoints from drools-pdp to common Removed policy-endpoints, and 3 classes from policy-core. Replaced refenences to the deleted classes with references to the corresponding classes in policy-common Issue-ID: POLICY-967 Change-Id: I547cde4894424b8f40b7ddd4e2342ebb729cb588 Signed-off-by: mmis --- .../drools/event/comm/FilterableTopicSource.java | 39 -- .../org/onap/policy/drools/event/comm/Topic.java | 86 --- .../policy/drools/event/comm/TopicEndpoint.java | 658 --------------------- .../policy/drools/event/comm/TopicListener.java | 38 -- .../drools/event/comm/TopicRegisterable.java | 42 -- .../onap/policy/drools/event/comm/TopicSink.java | 40 -- .../onap/policy/drools/event/comm/TopicSource.java | 37 -- .../drools/event/comm/bus/ApiKeyEnabled.java | 36 -- .../policy/drools/event/comm/bus/BusTopicSink.java | 47 -- .../drools/event/comm/bus/BusTopicSource.java | 78 --- .../drools/event/comm/bus/DmaapTopicSink.java | 30 - .../event/comm/bus/DmaapTopicSinkFactory.java | 465 --------------- .../drools/event/comm/bus/DmaapTopicSource.java | 29 - .../event/comm/bus/DmaapTopicSourceFactory.java | 587 ------------------ .../drools/event/comm/bus/NoopTopicSink.java | 126 ---- .../event/comm/bus/NoopTopicSinkFactory.java | 230 ------- .../policy/drools/event/comm/bus/UebTopicSink.java | 32 - .../drools/event/comm/bus/UebTopicSinkFactory.java | 306 ---------- .../drools/event/comm/bus/UebTopicSource.java | 34 -- .../event/comm/bus/UebTopicSourceFactory.java | 394 ------------ .../event/comm/bus/internal/BusConsumer.java | 546 ----------------- .../event/comm/bus/internal/BusPublisher.java | 391 ------------ .../event/comm/bus/internal/BusTopicBase.java | 131 ---- .../comm/bus/internal/InlineBusTopicSink.java | 212 ------- .../comm/bus/internal/InlineDmaapTopicSink.java | 144 ----- .../comm/bus/internal/InlineUebTopicSink.java | 94 --- .../bus/internal/SingleThreadedBusTopicSource.java | 332 ----------- .../internal/SingleThreadedDmaapTopicSource.java | 197 ------ .../bus/internal/SingleThreadedUebTopicSource.java | 95 --- .../drools/event/comm/bus/internal/TopicBase.java | 228 ------- .../onap/policy/drools/http/client/HttpClient.java | 48 -- .../drools/http/client/HttpClientFactory.java | 221 ------- .../drools/http/client/internal/JerseyClient.java | 249 -------- .../drools/http/server/HttpServletServer.java | 82 --- .../http/server/HttpServletServerFactory.java | 261 -------- .../http/server/internal/JettyJerseyServer.java | 249 -------- .../http/server/internal/JettyServletServer.java | 388 ------------ 37 files changed, 7202 deletions(-) delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/Topic.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicListener.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicRegisterable.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSink.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSource.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/ApiKeyEnabled.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSink.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSource.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSink.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSource.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSink.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSink.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSinkFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSource.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSourceFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClient.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClientFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/http/client/internal/JerseyClient.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServer.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServerFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyJerseyServer.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyServletServer.java (limited to 'policy-endpoints/src/main') diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java deleted file mode 100644 index b1e0e1c2..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm; - -/** - * TopicSource that supports server-side filtering. - */ -public interface FilterableTopicSource extends TopicSource { - - /** - * Sets the server-side filter. - * - * @param filter new filter value, or {@code null} - * @throws UnsupportedOperationException if the consumer does not support - * server-side filtering - * @throws IllegalArgumentException if the consumer cannot be built with the - * new filter - */ - public void setFilter(String filter); - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/Topic.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/Topic.java deleted file mode 100644 index 30174f1f..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/Topic.java +++ /dev/null @@ -1,86 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm; - -import java.util.List; - -import org.onap.policy.drools.properties.Lockable; -import org.onap.policy.drools.properties.Startable; - -/** - * Essential Topic Data - */ -public interface Topic extends TopicRegisterable, Startable, Lockable { - - /** - * network logger - */ - public static final String NETWORK_LOGGER = "network"; - - /** - * Underlying Communication infrastructure Types - */ - public enum CommInfrastructure { - /** - * UEB Communication Infrastructure - */ - UEB, - /** - * DMAAP Communication Infrastructure - */ - DMAAP, - /** - * NOOP for internal use only - */ - NOOP, - /** - * REST Communication Infrastructure - */ - REST - } - - /** - * gets the topic name - * - * @return topic name - */ - public String getTopic(); - - /** - * gets the communication infrastructure type - * @return - */ - public CommInfrastructure getTopicCommInfrastructure(); - - /** - * return list of servers - * @return bus servers - */ - public List getServers(); - - /** - * get the more recent events in this topic entity - * - * @return list of most recent events - */ - public String[] getRecentEvents(); - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java deleted file mode 100644 index 5c04bb8f..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java +++ /dev/null @@ -1,658 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -import org.onap.policy.drools.event.comm.bus.DmaapTopicSink; -import org.onap.policy.drools.event.comm.bus.DmaapTopicSource; -import org.onap.policy.drools.event.comm.bus.NoopTopicSink; -import org.onap.policy.drools.event.comm.bus.UebTopicSink; -import org.onap.policy.drools.event.comm.bus.UebTopicSource; -import org.onap.policy.drools.properties.Lockable; -import org.onap.policy.drools.properties.Startable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.annotation.JsonIgnore; - -/** - * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into - * the System. - */ -public interface TopicEndpoint extends Startable, Lockable { - - /** - * singleton for global access - */ - public static final TopicEndpoint manager = new ProxyTopicEndpointManager(); - - /** - * Add Topic Sources to the communication infrastructure initialized per properties - * - * @param properties properties for Topic Source construction - * @return a generic Topic Source - * @throws IllegalArgumentException when invalid arguments are provided - */ - public List addTopicSources(Properties properties); - - /** - * Add Topic Sinks to the communication infrastructure initialized per properties - * - * @param properties properties for Topic Sink construction - * @return a generic Topic Sink - * @throws IllegalArgumentException when invalid arguments are provided - */ - public List addTopicSinks(Properties properties); - - /** - * gets all Topic Sources - * - * @return the Topic Source List - */ - List getTopicSources(); - - /** - * get the Topic Sources for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source List - * @throws IllegalStateException if the entity is in an invalid state - * @throws IllegalArgumentException if invalid parameters are present - */ - public List getTopicSources(List topicNames); - - /** - * gets the Topic Source for the given topic name and underlying communication infrastructure type - * - * @param commType communication infrastructure type - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - * @throws UnsupportedOperationException if the operation is not supported. - */ - public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName); - - /** - * get the UEB Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the UEB Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSource getUebTopicSource(String topicName); - - /** - * get the DMAAP Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the DMAAP Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSource getDmaapTopicSource(String topicName); - - /** - * get the Topic Sinks for the given topic name - * - * @param topicNames the topic names - * @return the Topic Sink List - * @throws IllegalStateException - * @throws IllegalArgumentException - */ - public List getTopicSinks(List topicNames); - - /** - * get the Topic Sinks for the given topic name and underlying communication infrastructure type - * - * @param topicName the topic name - * @param commType communication infrastructure type - * - * @return the Topic Sink List - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicWriters for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName); - - /** - * get the Topic Sinks for the given topic name and all the underlying communication - * infrastructure type - * - * @param topicName the topic name - * @param commType communication infrastructure type - * - * @return the Topic Sink List - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicWriters for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public List getTopicSinks(String topicName); - - /** - * get the UEB Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSink getUebTopicSink(String topicName); - - /** - * get the no-op Topic Sink for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public NoopTopicSink getNoopTopicSink(String topicName); - - /** - * get the DMAAP Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSink getDmaapTopicSink(String topicName); - - /** - * gets only the UEB Topic Sources - * - * @return the UEB Topic Source List - */ - public List getUebTopicSources(); - - /** - * gets only the DMAAP Topic Sources - * - * @return the DMAAP Topic Source List - */ - public List getDmaapTopicSources(); - - /** - * gets all Topic Sinks - * - * @return the Topic Sink List - */ - public List getTopicSinks(); - - /** - * gets only the UEB Topic Sinks - * - * @return the UEB Topic Sink List - */ - public List getUebTopicSinks(); - - /** - * gets only the DMAAP Topic Sinks - * - * @return the DMAAP Topic Sink List - */ - public List getDmaapTopicSinks(); - - /** - * gets only the NOOP Topic Sinks - * - * @return the NOOP Topic Sinks List - */ - public List getNoopTopicSinks(); -} - - -/* - * ----------------- implementation ------------------- - */ - -/** - * This implementation of the Topic Endpoint Manager, proxies operations to appropriate - * implementations according to the communication infrastructure that are supported - */ -class ProxyTopicEndpointManager implements TopicEndpoint { - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); - /** - * Is this element locked? - */ - protected volatile boolean locked = false; - - /** - * Is this element alive? - */ - protected volatile boolean alive = false; - - @Override - public List addTopicSources(Properties properties) { - - // 1. Create UEB Sources - // 2. Create DMAAP Sources - - final List sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.build(properties)); - sources.addAll(DmaapTopicSource.factory.build(properties)); - - if (this.isLocked()) { - for (final TopicSource source : sources) { - source.lock(); - } - } - - return sources; - } - - @Override - public List addTopicSinks(Properties properties) { - // 1. Create UEB Sinks - // 2. Create DMAAP Sinks - - final List sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.build(properties)); - sinks.addAll(DmaapTopicSink.factory.build(properties)); - sinks.addAll(NoopTopicSink.factory.build(properties)); - - if (this.isLocked()) { - for (final TopicSink sink : sinks) { - sink.lock(); - } - } - - return sinks; - } - - @Override - public List getTopicSources() { - - final List sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.inventory()); - sources.addAll(DmaapTopicSource.factory.inventory()); - - return sources; - } - - @Override - public List getTopicSinks() { - - final List sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.inventory()); - sinks.addAll(DmaapTopicSink.factory.inventory()); - sinks.addAll(NoopTopicSink.factory.inventory()); - - return sinks; - } - - @JsonIgnore - @Override - public List getUebTopicSources() { - return UebTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List getDmaapTopicSources() { - return DmaapTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List getUebTopicSinks() { - return UebTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List getDmaapTopicSinks() { - return DmaapTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List getNoopTopicSinks() { - return NoopTopicSink.factory.inventory(); - } - - @Override - public boolean start() { - - synchronized (this) { - if (this.locked) { - throw new IllegalStateException(this + " is locked"); - } - - if (this.alive) { - return true; - } - - this.alive = true; - } - - final List endpoints = this.getEndpoints(); - - boolean success = true; - for (final Startable endpoint : endpoints) { - try { - success = endpoint.start() && success; - } catch (final Exception e) { - success = false; - logger.error("Problem starting endpoint: {}", endpoint, e); - } - } - - return success; - } - - - @Override - public boolean stop() { - - /* - * stop regardless if it is locked, in other words, stop operation has precedence over locks. - */ - synchronized (this) { - this.alive = false; - } - - final List endpoints = this.getEndpoints(); - - boolean success = true; - for (final Startable endpoint : endpoints) { - try { - success = endpoint.stop() && success; - } catch (final Exception e) { - success = false; - logger.error("Problem stopping endpoint: {}", endpoint, e); - } - } - - return success; - } - - /** - * - * @return list of managed endpoints - */ - @JsonIgnore - protected List getEndpoints() { - final List endpoints = new ArrayList<>(); - - endpoints.addAll(this.getTopicSources()); - endpoints.addAll(this.getTopicSinks()); - - return endpoints; - } - - @Override - public void shutdown() { - UebTopicSource.factory.destroy(); - UebTopicSink.factory.destroy(); - NoopTopicSink.factory.destroy(); - - DmaapTopicSource.factory.destroy(); - DmaapTopicSink.factory.destroy(); - } - - @Override - public boolean isAlive() { - return this.alive; - } - - @Override - public boolean lock() { - - synchronized (this) { - if (this.locked) - return true; - - this.locked = true; - } - - for (final TopicSource source : this.getTopicSources()) { - source.lock(); - } - - for (final TopicSink sink : this.getTopicSinks()) { - sink.lock(); - } - - return true; - } - - @Override - public boolean unlock() { - synchronized (this) { - if (!this.locked) - return true; - - this.locked = false; - } - - for (final TopicSource source : this.getTopicSources()) { - source.unlock(); - } - - for (final TopicSink sink : this.getTopicSinks()) { - sink.unlock(); - } - - return true; - } - - @Override - public boolean isLocked() { - return this.locked; - } - - @Override - public List getTopicSources(List topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - final List sources = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSource uebSource = this.getUebTopicSource(topic); - if (uebSource != null) - sources.add(uebSource); - } catch (final Exception e) { - logger.debug("No UEB source for topic: {}", topic, e); - } - - try { - final TopicSource dmaapSource = this.getDmaapTopicSource(topic); - if (dmaapSource != null) - sources.add(dmaapSource); - } catch (final Exception e) { - logger.debug("No DMAAP source for topic: {}", topic, e); - } - } - return sources; - } - - @Override - public List getTopicSinks(List topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - final List sinks = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSink uebSink = this.getUebTopicSink(topic); - if (uebSink != null) - sinks.add(uebSink); - } catch (final Exception e) { - logger.debug("No UEB sink for topic: {}", topic, e); - } - - try { - final TopicSink dmaapSink = this.getDmaapTopicSink(topic); - if (dmaapSink != null) - sinks.add(dmaapSink); - } catch (final Exception e) { - logger.debug("No DMAAP sink for topic: {}", topic, e); - } - - try { - final TopicSink noopSink = this.getNoopTopicSink(topic); - if (noopSink != null) - sinks.add(noopSink); - } catch (final Exception e) { - logger.debug("No NOOP sink for topic: {}", topic, e); - } - } - return sinks; - } - - @Override - public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) { - - if (commType == null) { - throw parmException(topicName); - } - - if (topicName == null) { - throw parmException(topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSource(topicName); - case DMAAP: - return this.getDmaapTopicSource(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - private IllegalArgumentException parmException(String topicName) { - return new IllegalArgumentException( - "Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - @Override - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) { - if (commType == null) { - throw parmException(topicName); - } - - if (topicName == null) { - throw parmException(topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSink(topicName); - case DMAAP: - return this.getDmaapTopicSink(topicName); - case NOOP: - return this.getNoopTopicSink(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - @Override - public List getTopicSinks(String topicName) { - if (topicName == null) { - throw parmException(topicName); - } - - final List sinks = new ArrayList<>(); - - try { - sinks.add(this.getUebTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getDmaapTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getNoopTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - return sinks; - } - -private void logNoSink(String topicName, Exception ex) { - logger.debug("No sink for topic: {}", topicName, ex); -} - - @Override - public UebTopicSource getUebTopicSource(String topicName) { - return UebTopicSource.factory.get(topicName); - } - - @Override - public UebTopicSink getUebTopicSink(String topicName) { - return UebTopicSink.factory.get(topicName); - } - - @Override - public DmaapTopicSource getDmaapTopicSource(String topicName) { - return DmaapTopicSource.factory.get(topicName); - } - - @Override - public DmaapTopicSink getDmaapTopicSink(String topicName) { - return DmaapTopicSink.factory.get(topicName); - } - - @Override - public NoopTopicSink getNoopTopicSink(String topicName) { - return NoopTopicSink.factory.get(topicName); - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicListener.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicListener.java deleted file mode 100644 index 4c8552b6..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicListener.java +++ /dev/null @@ -1,38 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm; - -/** - * Listener for event messages entering the Policy Engine - */ -@FunctionalInterface -public interface TopicListener { - - /** - * Notification of a new Event over a given Topic - * - * @param commType communication infrastructure type - * @param topic topic name - * @param event event message as a string - */ - public void onTopicEvent(Topic.CommInfrastructure commType, String topic, String event); - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicRegisterable.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicRegisterable.java deleted file mode 100644 index 540025e5..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicRegisterable.java +++ /dev/null @@ -1,42 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm; - -/** - * Marks a Topic entity as registerable - */ -public interface TopicRegisterable { - - /** - * Register for notification of events with this Topic Entity - * - * @param topicListener the listener of events - */ - public void register(TopicListener topicListener); - - /** - * Unregisters for notification of events with this Topic Entity - * - * @param topicListener the listener of events - */ - public void unregister(TopicListener topicListener); - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSink.java deleted file mode 100644 index 5ea849ee..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSink.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm; - -/** - * Marks a given Topic Endpoint as able to send messages over a topic - */ -public interface TopicSink extends Topic { - - /** - * Sends a string message over this Topic Endpoint - * - * @param message message to send - * - * @return true if the send operation succeeded, false otherwise - * @throws IllegalArgumentException an invalid message has been provided - * @throws IllegalStateException the entity is in an state that prevents - * it from sending messages, for example, locked or stopped. - */ - public boolean send(String message); - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSource.java deleted file mode 100644 index 17cde607..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSource.java +++ /dev/null @@ -1,37 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm; - -/** - * Marker for a Topic Entity, indicating that the entity is able to read - * over a topic - */ -public interface TopicSource extends Topic { - - /** - * pushes an event into the source programatically - * - * @param event the event in json format - * @return true if it can be processed correctly, false otherwise - */ - public boolean offer(String event); - -} \ No newline at end of file diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/ApiKeyEnabled.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/ApiKeyEnabled.java deleted file mode 100644 index 9ddf4fff..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/ApiKeyEnabled.java +++ /dev/null @@ -1,36 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus; - -/** - * API - */ -public interface ApiKeyEnabled { - /** - * @return api key - */ - public String getApiKey(); - - /** - * @return api secret - */ - public String getApiSecret(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSink.java deleted file mode 100644 index 99a600b0..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSink.java +++ /dev/null @@ -1,47 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus; - -import org.onap.policy.drools.event.comm.TopicSink; - -/** - * Topic Sink over Bus Infrastructure (DMAAP/UEB) - */ -public interface BusTopicSink extends ApiKeyEnabled, TopicSink { - /** - * Log Failures after X number of retries - */ - public static final int DEFAULT_LOG_SEND_FAILURES_AFTER = 1; - - /** - * Sets the UEB partition key for published messages - * - * @param partitionKey the partition key - */ - public void setPartitionKey(String partitionKey); - - /** - * return the partition key in used by the system to publish messages - * - * @return the partition key - */ - public String getPartitionKey(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSource.java deleted file mode 100644 index 83d4e72c..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSource.java +++ /dev/null @@ -1,78 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus; - -import org.onap.policy.drools.event.comm.TopicSource; - -/** - * Generic Topic Source for UEB/DMAAP Communication Infrastructure - * - */ -public interface BusTopicSource extends ApiKeyEnabled, TopicSource { - - /** - * Default Timeout fetching in milliseconds - */ - public static int DEFAULT_TIMEOUT_MS_FETCH = 15000; - - /** - * Default maximum number of messages fetch at the time - */ - public static int DEFAULT_LIMIT_FETCH = 100; - - /** - * Definition of No Timeout fetching - */ - public static int NO_TIMEOUT_MS_FETCH = -1; - - /** - * Definition of No limit fetching - */ - public static int NO_LIMIT_FETCH = -1; - - /** - * gets the consumer group - * - * @return consumer group - */ - public String getConsumerGroup(); - - /** - * gets the consumer instance - * - * @return consumer instance - */ - public String getConsumerInstance(); - - /** - * gets the fetch timeout - * - * @return fetch timeout - */ - public int getFetchTimeout(); - - /** - * gets the fetch limit - * - * @return fetch limit - */ - public int getFetchLimit(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSink.java deleted file mode 100644 index 982fcafa..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSink.java +++ /dev/null @@ -1,30 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus; - -public interface DmaapTopicSink extends BusTopicSink { - - /** - * Factory of UebTopicWriter for instantiation and management purposes - */ - - public static final DmaapTopicSinkFactory factory = new IndexedDmaapTopicSinkFactory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java deleted file mode 100644 index 5ff5084e..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java +++ /dev/null @@ -1,465 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; -import org.onap.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink; -import org.onap.policy.drools.properties.PolicyProperties; - -/** - * DMAAP Topic Sink Factory - */ -public interface DmaapTopicSinkFactory { - public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; - public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; - public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; - public final String DME2_VERSION_PROPERTY = "Version"; - public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; - public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; - public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; - public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; - - /** - * Instantiates a new DMAAP Topic Sink - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param userName AAF user name - * @param password AAF password - * @param partitionKey Consumer Group - * @param environment DME2 environment - * @param aftEnvironment DME2 AFT environment - * @param partner DME2 Partner - * @param latitude DME2 latitude - * @param longitude DME2 longitude - * @param additionalProps additional properties to pass to DME2 - * @param managed is this sink endpoint managed? - * - * @return an DMAAP Topic Sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSink build(List servers, - String topic, - String apiKey, - String apiSecret, - String userName, - String password, - String partitionKey, - String environment, - String aftEnvironment, - String partner, - String latitude, - String longitude, - Map additionalProps, - boolean managed, - boolean useHttps, - boolean allowSelfSignedCerts) ; - - /** - * Instantiates a new DMAAP Topic Sink - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param userName AAF user name - * @param password AAF password - * @param partitionKey Consumer Group - * @param managed is this sink endpoint managed? - * - * @return an DMAAP Topic Sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSink build(List servers, - String topic, - String apiKey, - String apiSecret, - String userName, - String password, - String partitionKey, - boolean managed, - boolean useHttps, - boolean allowSelfSignedCerts); - - /** - * Creates an DMAAP Topic Sink based on properties files - * - * @param properties Properties containing initialization values - * - * @return an DMAAP Topic Sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public List build(Properties properties); - - /** - * Instantiates a new DMAAP Topic Sink - * - * @param servers list of servers - * @param topic topic name - * - * @return an DMAAP Topic Sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSink build(List servers, String topic); - - /** - * Destroys an DMAAP Topic Sink based on a topic - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - public void destroy(String topic); - - /** - * gets an DMAAP Topic Sink based on topic name - * @param topic the topic name - * - * @return an DMAAP Topic Sink with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the DMAAP Topic Reader is - * an incorrect state - */ - public DmaapTopicSink get(String topic); - - /** - * Provides a snapshot of the DMAAP Topic Sinks - * @return a list of the DMAAP Topic Sinks - */ - public List inventory(); - - /** - * Destroys all DMAAP Topic Sinks - */ - public void destroy(); -} - -/* ------------- implementation ----------------- */ - -/** - * Factory of DMAAP Reader Topics indexed by topic name - */ -class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class); - - /** - * DMAAP Topic Name Index - */ - protected HashMap dmaapTopicWriters = new HashMap<>(); - - @Override - public DmaapTopicSink build(List servers, - String topic, - String apiKey, - String apiSecret, - String userName, - String password, - String partitionKey, - String environment, - String aftEnvironment, - String partner, - String latitude, - String longitude, - Map additionalProps, - boolean managed, - boolean useHttps, - boolean allowSelfSignedCerts) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicWriters.containsKey(topic)) { - return dmaapTopicWriters.get(topic); - } - - DmaapTopicSink dmaapTopicSink = - new InlineDmaapTopicSink(servers, topic, - apiKey, apiSecret, - userName, password, - partitionKey, - environment, aftEnvironment, - partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts); - - if (managed) - dmaapTopicWriters.put(topic, dmaapTopicSink); - return dmaapTopicSink; - } - } - - @Override - public DmaapTopicSink build(List servers, - String topic, - String apiKey, - String apiSecret, - String userName, - String password, - String partitionKey, - boolean managed, - boolean useHttps, boolean allowSelfSignedCerts) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicWriters.containsKey(topic)) { - return dmaapTopicWriters.get(topic); - } - - DmaapTopicSink dmaapTopicSink = - new InlineDmaapTopicSink(servers, topic, - apiKey, apiSecret, - userName, password, - partitionKey, useHttps, allowSelfSignedCerts); - - if (managed) - dmaapTopicWriters.put(topic, dmaapTopicSink); - return dmaapTopicSink; - } - } - - @Override - public DmaapTopicSink build(List servers, String topic) { - return this.build(servers, topic, null, null, null, null, null, true, false, false); - } - - @Override - public List build(Properties properties) { - - String writeTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS); - if (writeTopics == null || writeTopics.isEmpty()) { - logger.info("{}: no topic for DMaaP Sink", this); - return new ArrayList<>(); - } - - List writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); - List newDmaapTopicSinks = new ArrayList<>(); - synchronized(this) { - for (String topic: writeTopicList) { - if (this.dmaapTopicWriters.containsKey(topic)) { - newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic)); - continue; - } - String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + - topic + - PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - List serverList; - if (servers != null && !servers.isEmpty()) - serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - else serverList = new ArrayList<>(); - - String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); - String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); - - String partitionKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); - - String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + - PolicyProperties.PROPERTY_MANAGED_SUFFIX); - - /* DME2 Properties */ - - String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - - String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - - String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic - + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); - - String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic - + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); - - String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic - + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - - String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - - String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); - - String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); - - String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); - - String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic - + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); - - String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); - - String dme2SessionStickinessRequired = properties - .getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic - + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); - - Map dme2AdditionalProps = new HashMap<>(); - - if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) - dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); - if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) - dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); - if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) - dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); - if (dme2Version != null && !dme2Version.isEmpty()) - dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); - if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) - dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) - dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); - if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) - dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); - - if (servers == null || servers.isEmpty()) { - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - continue; - } - - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + - PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - //default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()){ - useHttps = Boolean.parseBoolean(useHttpsString); - } - - - String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + - PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - //default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){ - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, - apiKey, apiSecret, - aafMechId, aafPassword, - partitionKey, - dme2Environment, dme2AftEnvironment, - dme2Partner, dme2Latitude, dme2Longitude, - dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts); - - newDmaapTopicSinks.add(dmaapTopicSink); - } - return newDmaapTopicSinks; - } - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - DmaapTopicSink dmaapTopicWriter; - synchronized(this) { - if (!dmaapTopicWriters.containsKey(topic)) { - return; - } - - dmaapTopicWriter = dmaapTopicWriters.remove(topic); - } - - dmaapTopicWriter.shutdown(); - } - - @Override - public void destroy() { - List writers = this.inventory(); - for (DmaapTopicSink writer: writers) { - writer.shutdown(); - } - - synchronized(this) { - this.dmaapTopicWriters.clear(); - } - } - - @Override - public DmaapTopicSink get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized(this) { - if (dmaapTopicWriters.containsKey(topic)) { - return dmaapTopicWriters.get(topic); - } else { - throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); - } - } - } - - @Override - public synchronized List inventory() { - return new ArrayList<>(this.dmaapTopicWriters.values()); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("IndexedDmaapTopicSinkFactory []"); - return builder.toString(); - } - -} \ No newline at end of file diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSource.java deleted file mode 100644 index 8d9329fa..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSource.java +++ /dev/null @@ -1,29 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus; - -public interface DmaapTopicSource extends BusTopicSource { - - /** - * factory for managing and tracking DMAAP sources - */ - public static DmaapTopicSourceFactory factory = new IndexedDmaapTopicSourceFactory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java deleted file mode 100644 index 5a8e2a72..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java +++ /dev/null @@ -1,587 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.onap.policy.drools.event.comm.bus.internal.SingleThreadedDmaapTopicSource; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; -import org.onap.policy.drools.properties.PolicyProperties; - -/** - * DMAAP Topic Source Factory - */ -public interface DmaapTopicSourceFactory { - public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; - public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; - public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; - public final String DME2_VERSION_PROPERTY = "Version"; - public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; - public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; - public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; - public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; - - /** - * Creates an DMAAP Topic Source based on properties files - * - * @param properties Properties containing initialization values - * - * @return an DMAAP Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - public List build(Properties properties); - - /** - * Instantiates a new DMAAP Topic Source - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param userName user name - * @param password password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Read Fetch Timeout - * @param fetchLimit Fetch Limit - * @param managed is this endpoind managed? - * @param useHttps does the connection use HTTPS? - * @param allowSelfSignedCerts does connection allow self-signed certificates? - * - * @return an DMAAP Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSource build(List servers, - String topic, - String apiKey, - String apiSecret, - String userName, - String password, - String consumerGroup, - String consumerInstance, - int fetchTimeout, - int fetchLimit, - boolean managed, - boolean useHttps, - boolean allowSelfSignedCerts); - - /** - * Instantiates a new DMAAP Topic Source - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param userName user name - * @param password password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Read Fetch Timeout - * @param fetchLimit Fetch Limit - * @param environment DME2 environment - * @param aftEnvironment DME2 AFT environment - * @param partner DME2 Partner - * @param latitude DME2 latitude - * @param longitude DME2 longitude - * @param additionalProps additional properties to pass to DME2 - * @param managed is this endpoind managed? - * @param useHttps does the connection use HTTPS? - * @param allowSelfSignedCerts does connection allow self-signed certificates? - * - * @return an DMAAP Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSource build(List servers, - String topic, - String apiKey, - String apiSecret, - String userName, - String password, - String consumerGroup, - String consumerInstance, - int fetchTimeout, - int fetchLimit, - String environment, - String aftEnvironment, - String partner, - String latitude, - String longitude, - Map additionalProps, - boolean managed, - boolean useHttps, - boolean allowSelfSignedCerts); - - /** - * Instantiates a new DMAAP Topic Source - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * - * @return an DMAAP Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSource build(List servers, - String topic, - String apiKey, - String apiSecret); - - /** - * Instantiates a new DMAAP Topic Source - * - * @param servers list of servers - * @param topic topic name - * - * @return an DMAAP Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSource build(List servers, - String topic); - - /** - * Destroys an DMAAP Topic Source based on a topic - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - public void destroy(String topic); - - /** - * Destroys all DMAAP Topic Sources - */ - public void destroy(); - - /** - * gets an DMAAP Topic Source based on topic name - * @param topic the topic name - * @return an DMAAP Topic Source with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the DMAAP Topic Source is - * an incorrect state - */ - public DmaapTopicSource get(String topic); - - /** - * Provides a snapshot of the DMAAP Topic Sources - * @return a list of the DMAAP Topic Sources - */ - public List inventory(); -} - - -/* ------------- implementation ----------------- */ - -/** - * Factory of DMAAP Source Topics indexed by topic name - */ - -class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class); - - /** - * DMaaP Topic Name Index - */ - protected HashMap dmaapTopicSources = - new HashMap<>(); - - /** - * {@inheritDoc} - */ - @Override - public DmaapTopicSource build(List servers, - String topic, - String apiKey, - String apiSecret, - String userName, - String password, - String consumerGroup, - String consumerInstance, - int fetchTimeout, - int fetchLimit, - String environment, - String aftEnvironment, - String partner, - String latitude, - String longitude, - Map additionalProps, - boolean managed, - boolean useHttps, - boolean allowSelfSignedCerts) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized(this) { - if (dmaapTopicSources.containsKey(topic)) { - return dmaapTopicSources.get(topic); - } - - DmaapTopicSource dmaapTopicSource = - new SingleThreadedDmaapTopicSource(servers, topic, - apiKey, apiSecret, userName, password, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, - environment, aftEnvironment, partner, - latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts); - - if (managed) - dmaapTopicSources.put(topic, dmaapTopicSource); - - return dmaapTopicSource; - } - } - /** - * {@inheritDoc} - */ - @Override - public DmaapTopicSource build(List servers, - String topic, - String apiKey, - String apiSecret, - String userName, - String password, - String consumerGroup, - String consumerInstance, - int fetchTimeout, - int fetchLimit, - boolean managed, - boolean useHttps, - boolean allowSelfSignedCerts) { - - if (servers == null || servers.isEmpty()) { - throw new IllegalArgumentException("DMaaP Server(s) must be provided"); - } - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized(this) { - if (dmaapTopicSources.containsKey(topic)) { - return dmaapTopicSources.get(topic); - } - - DmaapTopicSource dmaapTopicSource = - new SingleThreadedDmaapTopicSource(servers, topic, - apiKey, apiSecret, userName, password, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts); - - if (managed) - dmaapTopicSources.put(topic, dmaapTopicSource); - - return dmaapTopicSource; - } - } - - /** - * {@inheritDoc} - */ - @Override - public List build(Properties properties) { - - String readTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS); - if (readTopics == null || readTopics.isEmpty()) { - logger.info("{}: no topic for DMaaP Source", this); - return new ArrayList<>(); - } - List readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); - - List dmaapTopicSourceLst = new ArrayList<>(); - synchronized(this) { - for (String topic: readTopicList) { - if (this.dmaapTopicSources.containsKey(topic)) { - dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + - topic + - PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - List serverList; - if (servers != null && !servers.isEmpty()) - serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - else serverList = new ArrayList<>(); - - String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - - String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); - - String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); - - String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); - - String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); - - String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); - - /* DME2 Properties */ - - String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - - String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - - String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); - - String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); - - String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - - String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - - String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); - - String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); - - String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); - - String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); - - String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); - - String dme2SessionStickinessRequired = properties - .getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); - - Map dme2AdditionalProps = new HashMap<>(); - - if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) - dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); - if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) - dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); - if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) - dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); - if (dme2Version != null && !dme2Version.isEmpty()) - dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); - if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) - dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) - dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); - if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) - dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); - - - if (servers == null || servers.isEmpty()) { - - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - continue; - } - - int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH; - if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { - try { - fetchTimeout = Integer.parseInt(fetchTimeoutString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", - this, fetchTimeoutString, topic); - } - } - - String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); - int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH; - if (fetchLimitString != null && !fetchLimitString.isEmpty()) { - try { - fetchLimit = Integer.parseInt(fetchLimitString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch limit {} is in invalid format for topic {} ", - this, fetchLimitString, topic); - } - } - - String managedString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + - PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - //default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()){ - useHttps = Boolean.parseBoolean(useHttpsString); - } - - String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + - PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - //default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){ - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - - DmaapTopicSource uebTopicSource = this.build(serverList, topic, - apiKey, apiSecret, aafMechId, aafPassword, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, - dme2Environment, dme2AftEnvironment, dme2Partner, - dme2Latitude, dme2Longitude, dme2AdditionalProps, - managed, useHttps, allowSelfSignedCerts); - - dmaapTopicSourceLst.add(uebTopicSource); - } - } - return dmaapTopicSourceLst; - } - - /** - * {@inheritDoc} - * @throws IllegalArgumentException - */ - @Override - public DmaapTopicSource build(List servers, - String topic, - String apiKey, - String apiSecret) { - return this.build(servers, topic, - apiKey, apiSecret, null, null, - null, null, - DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, - DmaapTopicSource.DEFAULT_LIMIT_FETCH, - true, - false, - false); - } - - /** - * {@inheritDoc} - * @throws IllegalArgumentException - */ - @Override - public DmaapTopicSource build(List servers, String topic) { - return this.build(servers, topic, null, null); - } - - /** - * {@inheritDoc} - */ - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - DmaapTopicSource uebTopicSource; - - synchronized(this) { - if (!dmaapTopicSources.containsKey(topic)) { - return; - } - - uebTopicSource = dmaapTopicSources.remove(topic); - } - - uebTopicSource.shutdown(); - } - - /** - * {@inheritDoc} - */ - @Override - public DmaapTopicSource get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized(this) { - if (dmaapTopicSources.containsKey(topic)) { - return dmaapTopicSources.get(topic); - } else { - throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found"); - } - } - } - - @Override - public synchronized List inventory() { - return new ArrayList<>(this.dmaapTopicSources.values()); - } - - @Override - public void destroy() { - List readers = this.inventory(); - for (DmaapTopicSource reader: readers) { - reader.shutdown(); - } - - synchronized(this) { - this.dmaapTopicSources.clear(); - } - } - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("IndexedDmaapTopicSourceFactory []"); - return builder.toString(); - } - -} - diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSink.java deleted file mode 100644 index afc11229..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSink.java +++ /dev/null @@ -1,126 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.policy.drools.event.comm.bus; - -import java.util.List; - -import org.onap.policy.drools.event.comm.TopicSink; -import org.onap.policy.drools.event.comm.bus.internal.TopicBase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * NOOP topic sink - */ -public class NoopTopicSink extends TopicBase implements TopicSink { - - /** - * factory - */ - public static final NoopTopicSinkFactory factory = new IndexedNoopTopicSinkFactory(); - - /** - * logger - */ - private static Logger logger = LoggerFactory.getLogger(NoopTopicSink.class); - - /** - * net logger - */ - private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER); - - /** - * constructor - * @param servers servers - * @param topic topic - * @throws IllegalArgumentException if an invalid argument has been passed in - */ - public NoopTopicSink(List servers, String topic) { - super(servers, topic); - } - - @Override - public boolean send(String message) { - - if (message == null || message.isEmpty()) - throw new IllegalArgumentException("Message to send is empty"); - - if (!this.alive) - throw new IllegalStateException(this + " is stopped"); - - try { - synchronized (this) { - this.recentEvents.add(message); - } - - netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), - this.topic, System.lineSeparator(), message); - - broadcast(message); - } catch (Exception e) { - logger.warn("{}: cannot send because of {}", this, e.getMessage(), e); - return false; - } - - return true; - } - - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return CommInfrastructure.NOOP; - } - - @Override - public boolean start() { - logger.info("{}: starting", this); - - synchronized(this) { - - if (this.alive) - return true; - - if (locked) - throw new IllegalStateException(this + " is locked."); - - this.alive = true; - } - - return true; - } - - @Override - public boolean stop() { - synchronized(this) { - this.alive = false; - } - return true; - } - - @Override - public void shutdown() { - this.stop(); - } - - @Override - public String toString() { - return "NoopTopicSink [toString()=" + super.toString() + "]"; - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java deleted file mode 100644 index 8633d093..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.policy.drools.event.comm.bus; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; - -import org.onap.policy.drools.properties.PolicyProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Noop Topic Sink Factory - */ -public interface NoopTopicSinkFactory { - - /** - * Creates noop topic sinks based on properties files - * - * @param properties Properties containing initialization values - * - * @return a noop topic sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public List build(Properties properties); - - /** - * builds a noop sink - * - * @param servers list of servers - * @param topic topic name - * @param managed is this sink endpoint managed? - * @return a noop topic sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public NoopTopicSink build(List servers, String topic, boolean managed); - - /** - * Destroys a sink based on the topic - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - public void destroy(String topic); - - /** - * gets a sink based on topic name - * - * @param topic the topic name - * - * @return a sink with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the sink is in an incorrect state - */ - public NoopTopicSink get(String topic); - - /** - * Provides a snapshot of the UEB Topic Writers - * - * @return a list of the UEB Topic Writers - */ - public List inventory(); - - /** - * Destroys all sinks - */ - public void destroy(); -} - - -/* ------------- implementation ----------------- */ - -/** - * Factory of noop sinks - */ -class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - -/** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); - - /** - * noop topic sinks map - */ - protected HashMap noopTopicSinks = new HashMap<>(); - - @Override - public List build(Properties properties) { - - final String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS); - if (sinkTopics == null || sinkTopics.isEmpty()) { - logger.info("{}: no topic for noop sink", this); - return new ArrayList<>(); - } - - final List sinkTopicList = - new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*"))); - final List newSinks = new ArrayList<>(); - synchronized (this) { - for (final String topic : sinkTopicList) { - if (this.noopTopicSinks.containsKey(topic)) { - newSinks.add(this.noopTopicSinks.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." - + topic + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - if (servers == null || servers.isEmpty()) - servers = "noop"; - - final List serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - final String managedString = - properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic - + PolicyProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - final NoopTopicSink noopSink = this.build(serverList, topic, managed); - newSinks.add(noopSink); - } - return newSinks; - } - } - - @Override - public NoopTopicSink build(List servers, String topic, boolean managed) { - - List noopSinkServers = servers; - if (noopSinkServers == null) { - noopSinkServers = new ArrayList<>(); - } - - if (noopSinkServers.isEmpty()) { - noopSinkServers.add("noop"); - } - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (this.noopTopicSinks.containsKey(topic)) { - return this.noopTopicSinks.get(topic); - } - - final NoopTopicSink sink = new NoopTopicSink(noopSinkServers, topic); - - if (managed) - this.noopTopicSinks.put(topic, sink); - - return sink; - } - } - - @Override - public void destroy(String topic) { - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - NoopTopicSink noopSink; - synchronized (this) { - if (!this.noopTopicSinks.containsKey(topic)) { - return; - } - - noopSink = this.noopTopicSinks.remove(topic); - } - - noopSink.shutdown(); - } - - @Override - public void destroy() { - final List sinks = this.inventory(); - for (final NoopTopicSink sink : sinks) { - sink.shutdown(); - } - - synchronized (this) { - this.noopTopicSinks.clear(); - } - } - - @Override - public NoopTopicSink get(String topic) { - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (this.noopTopicSinks.containsKey(topic)) { - return this.noopTopicSinks.get(topic); - } else { - throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); - } - } - } - - @Override - public List inventory() { - return new ArrayList<>(this.noopTopicSinks.values()); - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSink.java deleted file mode 100644 index 57dd1f1a..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSink.java +++ /dev/null @@ -1,32 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus; - -/** - * Topic Writer over UEB Infrastructure - */ -public interface UebTopicSink extends BusTopicSink { - - /** - * Factory of UEB Topic Sinks for instantiation and management purposes - */ - public static final UebTopicSinkFactory factory = new IndexedUebTopicSinkFactory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSinkFactory.java deleted file mode 100644 index 10468bef..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSinkFactory.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; - -import org.onap.policy.drools.event.comm.bus.internal.InlineUebTopicSink; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; -import org.onap.policy.drools.properties.PolicyProperties; - -/** - * UEB Topic Sink Factory - */ -public interface UebTopicSinkFactory { - - /** - * Instantiates a new UEB Topic Writer - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param partitionKey Consumer Group - * @param managed is this sink endpoint managed? - * - * @return an UEB Topic Sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSink build(List servers, - String topic, - String apiKey, - String apiSecret, - String partitionKey, - boolean managed, - boolean useHttps, - boolean allowSelfSignedCerts); - - /** - * Creates an UEB Topic Writer based on properties files - * - * @param properties Properties containing initialization values - * - * @return an UEB Topic Writer - * @throws IllegalArgumentException if invalid parameters are present - */ - public List build(Properties properties); - - /** - * Instantiates a new UEB Topic Writer - * - * @param servers list of servers - * @param topic topic name - * - * @return an UEB Topic Writer - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSink build(List servers, String topic); - - /** - * Destroys an UEB Topic Writer based on a topic - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - public void destroy(String topic); - - /** - * gets an UEB Topic Writer based on topic name - * @param topic the topic name - * - * @return an UEB Topic Writer with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the UEB Topic Reader is - * an incorrect state - */ - public UebTopicSink get(String topic); - - /** - * Provides a snapshot of the UEB Topic Writers - * @return a list of the UEB Topic Writers - */ - public List inventory(); - - /** - * Destroys all UEB Topic Writers - */ - public void destroy(); -} - -/* ------------- implementation ----------------- */ - -/** - * Factory of UEB Reader Topics indexed by topic name - */ -class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); - - /** - * UEB Topic Name Index - */ - protected HashMap uebTopicSinks = - new HashMap<>(); - - @Override - public UebTopicSink build(List servers, - String topic, - String apiKey, - String apiSecret, - String partitionKey, - boolean managed, - boolean useHttps, - boolean allowSelfSignedCerts) { - - if (servers == null || servers.isEmpty()) { - throw new IllegalArgumentException("UEB Server(s) must be provided"); - } - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSinks.containsKey(topic)) { - return uebTopicSinks.get(topic); - } - - UebTopicSink uebTopicWriter = - new InlineUebTopicSink(servers, topic, - apiKey, apiSecret,partitionKey, useHttps, allowSelfSignedCerts); - - if (managed) - uebTopicSinks.put(topic, uebTopicWriter); - - return uebTopicWriter; - } - } - - - @Override - public UebTopicSink build(List servers, String topic) { - return this.build(servers, topic, null, null, null, true, false, false); - } - - - @Override - public List build(Properties properties) { - - String writeTopics = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS); - if (writeTopics == null || writeTopics.isEmpty()) { - logger.info("{}: no topic for UEB Sink", this); - return new ArrayList<>(); - } - - List writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); - List newUebTopicSinks = new ArrayList<>(); - synchronized(this) { - for (String topic: writeTopicList) { - if (this.uebTopicSinks.containsKey(topic)) { - newUebTopicSinks.add(this.uebTopicSinks.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + - topic + - PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (servers == null || servers.isEmpty()) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - continue; - } - - List serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - String apiKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - String partitionKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); - - String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + - PolicyProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + - PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - //default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()){ - useHttps = Boolean.parseBoolean(useHttpsString); - } - - - String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + - PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - //default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){ - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - UebTopicSink uebTopicWriter = this.build(serverList, topic, - apiKey, apiSecret, - partitionKey, managed, useHttps, allowSelfSignedCerts); - newUebTopicSinks.add(uebTopicWriter); - } - return newUebTopicSinks; - } - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - UebTopicSink uebTopicWriter; - synchronized(this) { - if (!uebTopicSinks.containsKey(topic)) { - return; - } - - uebTopicWriter = uebTopicSinks.remove(topic); - } - - uebTopicWriter.shutdown(); - } - - @Override - public void destroy() { - List writers = this.inventory(); - for (UebTopicSink writer: writers) { - writer.shutdown(); - } - - synchronized(this) { - this.uebTopicSinks.clear(); - } - } - - @Override - public UebTopicSink get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized(this) { - if (uebTopicSinks.containsKey(topic)) { - return uebTopicSinks.get(topic); - } else { - throw new IllegalStateException("UebTopicSink for " + topic + " not found"); - } - } - } - - @Override - public synchronized List inventory() { - return new ArrayList<>(this.uebTopicSinks.values()); - } - - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("IndexedUebTopicSinkFactory []"); - return builder.toString(); - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSource.java deleted file mode 100644 index 7d35a993..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSource.java +++ /dev/null @@ -1,34 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus; - -/** - * Topic Source for UEB Communication Infrastructure - * - */ -public interface UebTopicSource extends BusTopicSource { - - /** - * factory for managing and tracking UEB readers - */ - public static UebTopicSourceFactory factory = - new IndexedUebTopicSourceFactory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSourceFactory.java deleted file mode 100644 index d48be278..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSourceFactory.java +++ /dev/null @@ -1,394 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; - -import org.onap.policy.drools.event.comm.bus.internal.SingleThreadedUebTopicSource; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; -import org.onap.policy.drools.properties.PolicyProperties; - -/** - * UEB Topic Source Factory - */ -public interface UebTopicSourceFactory { - - /** - * Creates an UEB Topic Source based on properties files - * - * @param properties Properties containing initialization values - * - * @return an UEB Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - public List build(Properties properties); - - /** - * Instantiates a new UEB Topic Source - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Read Fetch Timeout - * @param fetchLimit Fetch Limit - * @param managed is this source endpoint managed? - * - * @return an UEB Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSource build(List servers, - String topic, - String apiKey, - String apiSecret, - String consumerGroup, - String consumerInstance, - int fetchTimeout, - int fetchLimit, - boolean managed, - boolean useHttps, - boolean allowSelfSignedCerts); - - /** - * Instantiates a new UEB Topic Source - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * - * @return an UEB Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSource build(List servers, - String topic, - String apiKey, - String apiSecret); - - /** - * Instantiates a new UEB Topic Source - * - * @param servers list of servers - * @param topic topic name - * - * @return an UEB Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSource build(List servers, - String topic); - - /** - * Destroys an UEB Topic Source based on a topic - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - public void destroy(String topic); - - /** - * Destroys all UEB Topic Sources - */ - public void destroy(); - - /** - * gets an UEB Topic Source based on topic name - * @param topic the topic name - * @return an UEB Topic Source with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the UEB Topic Source is - * an incorrect state - */ - public UebTopicSource get(String topic); - - /** - * Provides a snapshot of the UEB Topic Sources - * @return a list of the UEB Topic Sources - */ - public List inventory(); -} - -/* ------------- implementation ----------------- */ - -/** - * Factory of UEB Source Topics indexed by topic name - */ -class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class); - - /** - * UEB Topic Name Index - */ - protected HashMap uebTopicSources = - new HashMap<>(); - - /** - * {@inheritDoc} - */ - @Override - public UebTopicSource build(List servers, - String topic, - String apiKey, - String apiSecret, - String consumerGroup, - String consumerInstance, - int fetchTimeout, - int fetchLimit, - boolean managed, - boolean useHttps, - boolean allowSelfSignedCerts) { - if (servers == null || servers.isEmpty()) { - throw new IllegalArgumentException("UEB Server(s) must be provided"); - } - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized(this) { - if (uebTopicSources.containsKey(topic)) { - return uebTopicSources.get(topic); - } - - UebTopicSource uebTopicSource = - new SingleThreadedUebTopicSource(servers, topic, - apiKey, apiSecret, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); - - if (managed) - uebTopicSources.put(topic, uebTopicSource); - - return uebTopicSource; - } - } - - /** - * {@inheritDoc} - */ - @Override - public List build(Properties properties) { - - String readTopics = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS); - if (readTopics == null || readTopics.isEmpty()) { - logger.info("{}: no topic for UEB Source", this); - return new ArrayList<>(); - } - List readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); - - List newUebTopicSources = new ArrayList<>(); - synchronized(this) { - for (String topic: readTopicList) { - if (this.uebTopicSources.containsKey(topic)) { - newUebTopicSources.add(this.uebTopicSources.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + - topic + - PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - if (servers == null || servers.isEmpty()) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - continue; - } - - List serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - String apiKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - - String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); - - String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); - - String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); - int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH; - if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { - try { - fetchTimeout = Integer.parseInt(fetchTimeoutString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", - this, fetchTimeoutString, topic); - } - } - - String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + - "." + topic + - PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); - int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH; - if (fetchLimitString != null && !fetchLimitString.isEmpty()) { - try { - fetchLimit = Integer.parseInt(fetchLimitString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch limit {} is in invalid format for topic {} ", - this, fetchLimitString, topic); - } - } - - String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + - topic + PolicyProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + - PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - //default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()){ - useHttps = Boolean.parseBoolean(useHttpsString); - } - - String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + - PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - //default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){ - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - UebTopicSource uebTopicSource = this.build(serverList, topic, - apiKey, apiSecret, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts); - newUebTopicSources.add(uebTopicSource); - } - } - return newUebTopicSources; - } - - /** - * {@inheritDoc} - */ - @Override - public UebTopicSource build(List servers, - String topic, - String apiKey, - String apiSecret) { - - return this.build(servers, topic, - apiKey, apiSecret, - null, null, - UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH, - UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true); - } - - /** - * {@inheritDoc} - */ - @Override - public UebTopicSource build(List servers, String topic) { - return this.build(servers, topic, null, null); - } - - /** - * {@inheritDoc} - */ - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - UebTopicSource uebTopicSource; - - synchronized(this) { - if (!uebTopicSources.containsKey(topic)) { - return; - } - - uebTopicSource = uebTopicSources.remove(topic); - } - - uebTopicSource.shutdown(); - } - - /** - * {@inheritDoc} - */ - @Override - public UebTopicSource get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized(this) { - if (uebTopicSources.containsKey(topic)) { - return uebTopicSources.get(topic); - } else { - throw new IllegalStateException("UebTopiceSource for " + topic + " not found"); - } - } - } - - @Override - public synchronized List inventory() { - return new ArrayList<>(this.uebTopicSources.values()); - } - - @Override - public void destroy() { - List readers = this.inventory(); - for (UebTopicSource reader: readers) { - reader.shutdown(); - } - - synchronized(this) { - this.uebTopicSources.clear(); - } - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("IndexedUebTopicSourceFactory []"); - return builder.toString(); - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java deleted file mode 100644 index 828bb920..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java +++ /dev/null @@ -1,546 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus.internal; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import org.onap.policy.drools.event.comm.bus.DmaapTopicSinkFactory; -import org.onap.policy.drools.properties.PolicyProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.client.impl.MRConsumerImpl; -import com.att.nsa.mr.client.response.MRConsumerResponse; -import com.att.nsa.mr.test.clients.ProtocolTypeConstants; - -/** - * Wrapper around libraries to consume from message bus - * - */ -public interface BusConsumer { - - /** - * fetch messages - * - * @return list of messages - * @throws Exception when error encountered by underlying libraries - */ - public Iterable fetch() throws InterruptedException, IOException; - - /** - * close underlying library consumer - */ - public void close(); - - /** - * BusConsumer that supports server-side filtering. - */ - public interface FilterableBusConsumer extends BusConsumer { - - /** - * Sets the server-side filter. - * - * @param filter new filter value, or {@code null} - * @throws IllegalArgumentException if the consumer cannot be built with - * the new filter - */ - public void setFilter(String filter); - } - - /** - * Cambria based consumer - */ - public static class CambriaConsumerWrapper implements FilterableBusConsumer { - - /** - * logger - */ - private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); - - /** - * Used to build the consumer. - */ - private final ConsumerBuilder builder; - - /** - * Locked while updating {@link #consumer} and {@link #newConsumer}. - */ - private final Object consLocker = new Object(); - - /** - * Cambria client - */ - private CambriaConsumer consumer; - - /** - * Cambria client to use for next fetch - */ - private CambriaConsumer newConsumer = null; - - /** - * fetch timeout - */ - protected int fetchTimeout; - - /** - * close condition - */ - protected Object closeCondition = new Object(); - - /** - * Cambria Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws GeneralSecurityException - * @throws MalformedURLException - */ - public CambriaConsumerWrapper(List servers, String topic, String apiKey, - String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) { - this(servers, topic, apiKey, apiSecret, null, null, - consumerGroup, consumerInstance, fetchTimeout, fetchLimit, - useHttps, useSelfSignedCerts); - } - - public CambriaConsumerWrapper(List servers, String topic, String apiKey, - String apiSecret, String username, String password, - String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) { - - this.fetchTimeout = fetchTimeout; - - this.builder = new CambriaClientBuilders.ConsumerBuilder(); - - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); - - // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) - builder.withSocketTimeout(fetchTimeout + 30000); - - if (useHttps) { - builder.usingHttps(); - - if (useSelfSignedCerts) { - builder.allowSelfSignedCertificates(); - } - } - - if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); - } - - if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { - builder.authenticatedByHttp(username, password); - } - - try { - this.consumer = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public Iterable fetch() throws IOException, InterruptedException { - try { - return getCurrentConsumer().fetch(); - } catch (final IOException e) { - logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), - this.fetchTimeout); - synchronized (this.closeCondition) { - this.closeCondition.wait(this.fetchTimeout); - } - - throw e; - } - } - - @Override - public void close() { - synchronized (closeCondition) { - closeCondition.notifyAll(); - } - - getCurrentConsumer().close(); - } - - private CambriaConsumer getCurrentConsumer() { - CambriaConsumer old = null; - CambriaConsumer ret; - - synchronized(consLocker) { - if(this.newConsumer != null) { - // replace old consumer with new consumer - old = this.consumer; - this.consumer = this.newConsumer; - this.newConsumer = null; - } - - ret = this.consumer; - } - - if(old != null) { - old.close(); - } - - return ret; - } - - @Override - public void setFilter(String filter) { - logger.info("{}: setting DMAAP server-side filter: {}", this, filter); - builder.withServerSideFilter(filter); - - try { - CambriaConsumer previous; - synchronized(consLocker) { - previous = this.newConsumer; - this.newConsumer = builder.build(); - } - - if(previous != null) { - // there was already a new consumer - close it - previous.close(); - } - - } catch (MalformedURLException | GeneralSecurityException e) { - /* - * Since an exception occurred, "consumer" still has its old value, - * thus it should not be closed at this point. - */ - throw new IllegalArgumentException(e); - } - } - - @Override - public String toString() { - return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; - } - } - - /** - * MR based consumer - */ - public abstract class DmaapConsumerWrapper implements BusConsumer { - - /** - * logger - */ - private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); - - /** - * Name of the "protocol" property. - */ - protected static final String PROTOCOL_PROP = "Protocol"; - - /** - * fetch timeout - */ - protected int fetchTimeout; - - /** - * close condition - */ - protected Object closeCondition = new Object(); - - /** - * MR Consumer - */ - protected MRConsumerImpl consumer; - - /** - * MR Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param username AAF Login - * @param password AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws MalformedURLException - */ - public DmaapConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, - String username, String password, String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit) throws MalformedURLException { - - this.fetchTimeout = fetchTimeout; - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("No topic for DMaaP"); - } - - this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, null, apiKey, apiSecret); - - this.consumer.setUsername(username); - this.consumer.setPassword(password); - } - - @Override - public Iterable fetch() throws InterruptedException, IOException { - final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); - if (response == null) { - logger.warn("{}: DMaaP NULL response received", this); - - synchronized (closeCondition) { - closeCondition.wait(fetchTimeout); - } - return new ArrayList<>(); - } else { - logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(), - response.getResponseMessage()); - - if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) { - - logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), - response.getResponseMessage()); - - synchronized (closeCondition) { - closeCondition.wait(fetchTimeout); - } - - /* fall through */ - } - } - - if (response.getActualMessages() == null) - return new ArrayList<>(); - else - return response.getActualMessages(); - } - - @Override - public void close() { - synchronized (closeCondition) { - closeCondition.notifyAll(); - } - - this.consumer.close(); - } - - @Override - public String toString() { - return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" - + consumer.getHost() + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() - + ", consumer.getUsername()=" + consumer.getUsername() + "]"; - } - } - - /** - * MR based consumer - */ - public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); - - private final Properties props; - - /** - * MR Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param aafLogin AAF Login - * @param aafPassword AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws MalformedURLException - */ - public DmaapAafConsumerWrapper(List servers, String topic, String apiKey, - String apiSecret, String aafLogin, String aafPassword, String consumerGroup, - String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps) - throws MalformedURLException { - - super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, - consumerInstance, fetchTimeout, fetchLimit); - - // super constructor sets servers = {""} if empty to avoid errors when using DME2 - if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) - || (servers.isEmpty())) { - throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); - } - - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - props = new Properties(); - - if (useHttps) { - props.setProperty(PROTOCOL_PROP, "https"); - this.consumer.setHost(servers.get(0) + ":3905"); - - } else { - props.setProperty(PROTOCOL_PROP, "http"); - this.consumer.setHost(servers.get(0) + ":3904"); - } - - this.consumer.setProps(props); - logger.info("{}: CREATION", this); - } - - @Override - public String toString() { - final MRConsumerImpl consumer = this.consumer; - - return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" - + consumer.getHost() + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() - + ", consumer.getUsername()=" + consumer.getUsername() + "]"; - } - } - - public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); - - private final Properties props; - - public DmaapDmeConsumerWrapper(List servers, String topic, String apiKey, - String apiSecret, String dme2Login, String dme2Password, String consumerGroup, - String consumerInstance, int fetchTimeout, int fetchLimit, String environment, - String aftEnvironment, String dme2Partner, String latitude, String longitude, - Map additionalProps, boolean useHttps) throws MalformedURLException { - - - - super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, - consumerInstance, fetchTimeout, fetchLimit); - - - final String dme2RouteOffer = - additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); - - if (environment == null || environment.isEmpty()) { - throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (aftEnvironment == null || aftEnvironment.isEmpty()) { - throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (latitude == null || latitude.isEmpty()) { - throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (longitude == null || longitude.isEmpty()) { - throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - } - - if ((dme2Partner == null || dme2Partner.isEmpty()) - && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { - throw new IllegalArgumentException( - "Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); - } - - final String serviceName = servers.get(0); - - this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - - this.consumer.setUsername(dme2Login); - this.consumer.setPassword(dme2Password); - - props = new Properties(); - - props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); - - props.setProperty("username", dme2Login); - props.setProperty("password", dme2Password); - - /* These are required, no defaults */ - props.setProperty("topic", topic); - - props.setProperty("Environment", environment); - props.setProperty("AFT_ENVIRONMENT", aftEnvironment); - - if (dme2Partner != null) - props.setProperty("Partner", dme2Partner); - if (dme2RouteOffer != null) - props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - - props.setProperty("Latitude", latitude); - props.setProperty("Longitude", longitude); - - /* These are optional, will default to these values if not set in additionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - - /* These should not change */ - props.setProperty("TransportType", "DME2"); - props.setProperty("MethodType", "GET"); - - if (useHttps) { - props.setProperty(PROTOCOL_PROP, "https"); - - } else { - props.setProperty(PROTOCOL_PROP, "http"); - } - - props.setProperty("contenttype", "application/json"); - - if (additionalProps != null) { - for (Map.Entry entry : additionalProps.entrySet()) - props.put(entry.getKey(), entry.getValue()); - } - - MRClientFactory.prop = props; - this.consumer.setProps(props); - - logger.info("{}: CREATION", this); - } - - private IllegalArgumentException parmException(String topic, String propnm) { - return new IllegalArgumentException( - "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + propnm + " property for DME2 in DMaaP"); - - } - } -} - - diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java deleted file mode 100644 index 1efaa063..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java +++ /dev/null @@ -1,391 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus.internal; - -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import org.onap.policy.drools.event.comm.bus.DmaapTopicSinkFactory; -import org.onap.policy.drools.properties.PolicyProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; -import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; -import com.att.nsa.mr.client.response.MRPublisherResponse; -import com.att.nsa.mr.test.clients.ProtocolTypeConstants; -import com.fasterxml.jackson.annotation.JsonIgnore; - -public interface BusPublisher { - - /** - * sends a message - * - * @param partition id - * @param message the message - * @return true if success, false otherwise - * @throws IllegalArgumentException if no message provided - */ - public boolean send(String partitionId, String message); - - /** - * closes the publisher - */ - public void close(); - - /** - * Cambria based library publisher - */ - public static class CambriaPublisherWrapper implements BusPublisher { - - private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class); - - /** - * The actual Cambria publisher - */ - @JsonIgnore - protected volatile CambriaBatchingPublisher publisher; - - public CambriaPublisherWrapper(List servers, String topic, - String apiKey, - String apiSecret, boolean useHttps) { - this(servers, topic, apiKey, apiSecret, null, null, useHttps); - } - - public CambriaPublisherWrapper(List servers, String topic, - String apiKey, String apiSecret, - String username, String password, - boolean useHttps) { - - PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); - - builder.usingHosts(servers).onTopic(topic); - - // Set read timeout to 30 seconds (TBD: this should be configurable) - builder.withSocketTimeout(30000); - - if (useHttps){ - builder.usingHttps(); - } - - - if (apiKey != null && !apiKey.isEmpty() && - apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); - } - - if (username != null && !username.isEmpty() && - password != null && !password.isEmpty()) { - builder.authenticatedByHttp(username, password); - } - - try { - this.publisher = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - /** - * {@inheritDoc} - */ - @Override - public boolean send(String partitionId, String message) { - if (message == null) - throw new IllegalArgumentException("No message provided"); - - try { - this.publisher.send(partitionId, message); - } catch (Exception e) { - logger.warn("{}: SEND of {} cannot be performed because of {}", - this, message, e.getMessage(), e); - return false; - } - return true; - } - - /** - * {@inheritDoc} - */ - @Override - public void close() { - logger.info("{}: CLOSE", this); - - try { - this.publisher.close(); - } catch (Exception e) { - logger.warn("{}: CLOSE FAILED because of {}", - this, e.getMessage(),e); - } - } - - - @Override - public String toString() { - return "CambriaPublisherWrapper []"; - } - - } - - /** - * DmaapClient library wrapper - */ - public abstract class DmaapPublisherWrapper implements BusPublisher { - - private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class); - - /** - * MR based Publisher - */ - protected MRSimplerBatchPublisher publisher; - protected Properties props; - - /** - * MR Publisher Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param username AAF or DME2 Login - * @param password AAF or DME2 Password - */ - public DmaapPublisherWrapper(ProtocolTypeConstants protocol, - List servers, String topic, - String username, - String password, boolean useHttps) { - - - if (topic == null || topic.isEmpty()) - throw new IllegalArgumentException("No topic for DMaaP"); - - - if (protocol == ProtocolTypeConstants.AAF_AUTH) { - if (servers == null || servers.isEmpty()) - throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided"); - - ArrayList dmaapServers = new ArrayList<>(); - if(useHttps){ - for (String server: servers) { - dmaapServers.add(server + ":3905"); - } - - } - else{ - for (String server: servers) { - dmaapServers.add(server + ":3904"); - } - } - - - this.publisher = - new MRSimplerBatchPublisher.Builder(). - againstUrls(dmaapServers). - onTopic(topic). - build(); - - this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - } else if (protocol == ProtocolTypeConstants.DME2) { - ArrayList dmaapServers = new ArrayList<>(); - dmaapServers.add("0.0.0.0:3904"); - - this.publisher = - new MRSimplerBatchPublisher.Builder(). - againstUrls(dmaapServers). - onTopic(topic). - build(); - - this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - } - - this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); - - this.publisher.setUsername(username); - this.publisher.setPassword(password); - - props = new Properties(); - - if (useHttps) { - props.setProperty("Protocol", "https"); - } else { - props.setProperty("Protocol", "http"); - } - - props.setProperty("contenttype", "application/json"); - props.setProperty("username", username); - props.setProperty("password", password); - - props.setProperty("topic", topic); - - this.publisher.setProps(props); - - if (protocol == ProtocolTypeConstants.AAF_AUTH) - this.publisher.setHost(servers.get(0)); - - logger.info("{}: CREATION: using protocol {}", this, protocol.getValue()); - } - - /** - * {@inheritDoc} - */ - @Override - public void close() { - logger.info("{}: CLOSE", this); - - try { - this.publisher.close(1, TimeUnit.SECONDS); - } catch (Exception e) { - logger.warn("{}: CLOSE FAILED because of {}", - this, e.getMessage(), e); - } - } - - /** - * {@inheritDoc} - */ - @Override - public boolean send(String partitionId, String message) { - if (message == null) - throw new IllegalArgumentException("No message provided"); - - this.publisher.setPubResponse(new MRPublisherResponse()); - this.publisher.send(partitionId, message); - MRPublisherResponse response = this.publisher.sendBatchWithResponse(); - if (response != null) { - logger.debug("DMaaP publisher received {} : {}", - response.getResponseCode(), - response.getResponseMessage()); - } - - return true; - } - - @Override - public String toString() { - return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate() - + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()=" - + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag() - + ", publisher.getUsername()=" + publisher.getUsername() + "]"; - } - } - - /** - * DmaapClient library wrapper - */ - public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper { - /** - * MR based Publisher - */ - public DmaapAafPublisherWrapper(List servers, String topic, - String aafLogin, - String aafPassword, boolean useHttps) { - - super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps); - } - } - - public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { - public DmaapDmePublisherWrapper(List servers, String topic, - String username, String password, - String environment, String aftEnvironment, String dme2Partner, - String latitude, String longitude, Map additionalProps, boolean useHttps) { - - super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps); - - - - - - - String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); - - if (environment == null || environment.isEmpty()) { - throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (aftEnvironment == null || aftEnvironment.isEmpty()) { - throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (latitude == null || latitude.isEmpty()) { - throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (longitude == null || longitude.isEmpty()) { - throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - } - - if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { - throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + - PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); - } - - String serviceName = servers.get(0); - - /* These are required, no defaults */ - props.setProperty("Environment", environment); - props.setProperty("AFT_ENVIRONMENT", aftEnvironment); - - props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); - - if (dme2Partner != null) - props.setProperty("Partner", dme2Partner); - if (dme2RouteOffer != null) - props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - - props.setProperty("Latitude", latitude); - props.setProperty("Longitude", longitude); - - // ServiceName also a default, found in additionalProps - - /* These are optional, will default to these values if not set in optionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - - /* These should not change */ - props.setProperty("TransportType", "DME2"); - props.setProperty("MethodType", "POST"); - - for (Map.Entry entry : additionalProps.entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - - if (value != null) - props.setProperty(key, value); - } - - this.publisher.setProps(props); - } - - private IllegalArgumentException parmException(String topic, String propnm) { - return new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + - "." + topic + propnm + " property for DME2 in DMaaP"); - - } - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java deleted file mode 100644 index 0bf3d445..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus.internal; - -import java.util.List; - -import org.onap.policy.drools.event.comm.bus.ApiKeyEnabled; - -/** - * Bus Topic Base - */ -public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { - - /** - * API Key - */ - protected String apiKey; - - /** - * API Secret - */ - protected String apiSecret; - - /** - * Use https - */ - protected boolean useHttps; - - /** - * allow self signed certificates - */ - protected boolean allowSelfSignedCerts; - - /** - * Instantiates a new Bus Topic Base - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow - * - * @return a Bus Topic Base - * @throws IllegalArgumentException if invalid parameters are present - */ - public BusTopicBase(List servers, - String topic, - String apiKey, - String apiSecret, - boolean useHttps, - boolean allowSelfSignedCerts) { - - super(servers, topic); - - this.apiKey = apiKey; - this.apiSecret = apiSecret; - this.useHttps = useHttps; - this.allowSelfSignedCerts = allowSelfSignedCerts; - } - - @Override - public String getApiKey() { - return apiKey; - } - - @Override - public String getApiSecret() { - return apiSecret; - } - - /** - * @return if using https - */ - public boolean isUseHttps(){ - return useHttps; - } - - /** - * @return if self signed certificates are allowed - */ - public boolean isAllowSelfSignedCerts(){ - return allowSelfSignedCerts; - } - - protected boolean anyNullOrEmpty(String... args) { - for (String arg : args) { - if (arg == null || arg.isEmpty()) { - return true; - } - } - - return false; - } - - protected boolean allNullOrEmpty(String... args) { - for (String arg : args) { - if (!(arg == null || arg.isEmpty())) { - return false; - } - } - - return true; - } - - - @Override - public String toString() { - return "BusTopicBase [apiKey=" + apiKey + ", apiSecret=" + apiSecret + ", useHttps=" + useHttps - + ", allowSelfSignedCerts=" + allowSelfSignedCerts + ", toString()=" + super.toString() + "]"; - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java deleted file mode 100644 index a50d7b10..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus.internal; - -import java.util.List; -import java.util.UUID; - -import org.onap.policy.drools.event.comm.bus.BusTopicSink; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; - -/** - * Transport Agnostic Bus Topic Sink to carry out the core functionality - * to interact with a sink regardless if it is UEB or DMaaP. - * - */ -public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink { - - /** - * loggers - */ - private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class); - private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER); - - /** - * The partition key to publish to - */ - protected String partitionId; - - /** - * message bus publisher - */ - protected BusPublisher publisher; - - /** - * constructor for abstract sink - * - * @param servers servers - * @param topic topic - * @param apiKey api secret - * @param apiSecret api secret - * @param partitionId partition id - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow - * @throws IllegalArgumentException in invalid parameters are passed in - */ - public InlineBusTopicSink(List servers, String topic, - String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts) { - - super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts); - - if (partitionId == null || partitionId.isEmpty()) { - this.partitionId = UUID.randomUUID ().toString(); - } - } - - /** - * Initialize the Bus publisher - */ - public abstract void init(); - - /** - * {@inheritDoc} - */ - @Override - public boolean start() { - logger.info("{}: starting", this); - - synchronized(this) { - - if (this.alive) - return true; - - if (locked) - throw new IllegalStateException(this + " is locked."); - - this.alive = true; - } - - this.init(); - return true; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean stop() { - - BusPublisher publisherCopy; - synchronized(this) { - this.alive = false; - publisherCopy = this.publisher; - this.publisher = null; - } - - if (publisherCopy != null) { - try { - publisherCopy.close(); - } catch (Exception e) { - logger.warn("{}: cannot stop publisher because of {}", - this, e.getMessage(), e); - } - } else { - logger.warn("{}: there is no publisher", this); - return false; - } - - return true; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean send(String message) { - - if (message == null || message.isEmpty()) { - throw new IllegalArgumentException("Message to send is empty"); - } - - if (!this.alive) { - throw new IllegalStateException(this + " is stopped"); - } - - try { - synchronized (this) { - this.recentEvents.add(message); - } - - netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), - this.topic, System.lineSeparator(), message); - - publisher.send(this.partitionId, message); - broadcast(message); - } catch (Exception e) { - logger.warn("{}: cannot send because of {}", this, e.getMessage(), e); - return false; - } - - return true; - } - - - /** - * {@inheritDoc} - */ - @Override - public void setPartitionKey(String partitionKey) { - this.partitionId = partitionKey; - } - - /** - * {@inheritDoc} - */ - @Override - public String getPartitionKey() { - return this.partitionId; - } - - /** - * {@inheritDoc} - */ - @Override - public void shutdown() { - this.stop(); - } - - protected boolean anyNullOrEmpty(String... args) { - for (String arg : args) { - if (arg == null || arg.isEmpty()) { - return true; - } - } - - return false; - } - - protected boolean allNullOrEmpty(String... args) { - for (String arg : args) { - if (!(arg == null || arg.isEmpty())) { - return false; - } - } - - return true; - } - - - @Override - public String toString() { - return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]"; - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java deleted file mode 100644 index 48116e34..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java +++ /dev/null @@ -1,144 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus.internal; - -import java.util.List; -import java.util.Map; - -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; -import org.onap.policy.drools.event.comm.Topic; -import org.onap.policy.drools.event.comm.bus.DmaapTopicSink; - -/** - * This implementation publishes events for the associated DMAAP topic, - * inline with the calling thread. - */ -public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink { - - protected static Logger logger = - LoggerFactory.getLogger(InlineDmaapTopicSink.class); - - protected final String userName; - protected final String password; - - protected String environment = null; - protected String aftEnvironment = null; - protected String partner = null; - protected String latitude = null; - protected String longitude = null; - - protected Map additionalProps = null; - - /** - * - * @param servers DMaaP servers - * @param topic DMaaP Topic to be monitored - * @param apiKey DMaaP API Key (optional) - * @param apiSecret DMaaP API Secret (optional) - * @param consumerGroup DMaaP Reader Consumer Group - * @param consumerInstance DMaaP Reader Instance - * @param fetchTimeout DMaaP fetch timeout - * @param fetchLimit DMaaP fetch limit - * @param environment DME2 Environment - * @param aftEnvironment DME2 AFT Environment - * @param partner DME2 Partner - * @param latitude DME2 Latitude - * @param longitude DME2 Longitude - * @param additionalProps Additional properties to pass to DME2 - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow - * - * @throws IllegalArgumentException An invalid parameter passed in - */ - public InlineDmaapTopicSink(List servers, String topic, - String apiKey, String apiSecret, - String userName, String password, - String partitionKey, - String environment, String aftEnvironment, String partner, - String latitude, String longitude, Map additionalProps, - boolean useHttps, boolean allowSelfSignedCerts) { - - super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts); - - this.userName = userName; - this.password = password; - - this.environment = environment; - this.aftEnvironment = aftEnvironment; - this.partner = partner; - - this.latitude = latitude; - this.longitude = longitude; - - this.additionalProps = additionalProps; - } - - public InlineDmaapTopicSink(List servers, String topic, - String apiKey, String apiSecret, - String userName, String password, - String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) { - - super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts); - - this.userName = userName; - this.password = password; - } - - - @Override - public void init() { - if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.publisher = - new BusPublisher.CambriaPublisherWrapper(this.servers, - this.topic, - this.apiKey, this.apiSecret, - this.userName, this.password, - this.useHttps); - } else { - this.publisher = - new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, - this.userName, this.password, - this.environment, this.aftEnvironment, - this.partner, this.latitude, this.longitude, - this.additionalProps, this.useHttps); - } - - logger.info("{}: DMAAP SINK created", this); - } - - /** - * {@inheritDoc} - */ - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.DMAAP; - } - - - @Override - public String toString() { - return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password - + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" - + super.toString() + "]"; - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java deleted file mode 100644 index d1218f3f..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus.internal; - -import java.util.List; - -import org.onap.policy.drools.event.comm.Topic; -import org.onap.policy.drools.event.comm.bus.UebTopicSink; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; - -/** - * This implementation publishes events for the associated UEB topic, - * inline with the calling thread. - */ -public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink { - - /** - * logger - */ - private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class); - - /** - * Argument-based UEB Topic Writer instantiation - * - * @param servers list of UEB servers available for publishing - * @param topic the topic to publish to - * @param apiKey the api key (optional) - * @param apiSecret the api secret (optional) - * @param partitionId the partition key (optional, autogenerated if not provided) - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow - * - * @throws IllegalArgumentException if invalid arguments are detected - */ - public InlineUebTopicSink(List servers, - String topic, - String apiKey, - String apiSecret, - String partitionId, - boolean useHttps, - boolean allowSelfSignedCerts) { - super(servers, topic, apiKey, apiSecret, partitionId, useHttps, allowSelfSignedCerts); - } - - /** - * Instantiation of internal resources - */ - @Override - public void init() { - - this.publisher = - new BusPublisher.CambriaPublisherWrapper(this.servers, - this.topic, - this.apiKey, - this.apiSecret, - this.useHttps); - logger.info("{}: UEB SINK created", this); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("InlineUebTopicSink [getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()) - .append(", toString()=").append(super.toString()).append("]"); - return builder.toString(); - } - - /** - * {@inheritDoc} - */ - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.UEB; - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java deleted file mode 100644 index 768046d0..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ /dev/null @@ -1,332 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus.internal; - -import java.net.MalformedURLException; -import java.util.List; -import java.util.UUID; -import org.onap.policy.drools.event.comm.FilterableTopicSource; -import org.onap.policy.drools.event.comm.TopicListener; -import org.onap.policy.drools.event.comm.bus.BusTopicSource; -import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer; -import org.onap.policy.drools.utils.NetworkUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This topic source implementation specializes in reading messages - * over a bus topic source and notifying its listeners - */ -public abstract class SingleThreadedBusTopicSource - extends BusTopicBase - implements Runnable, BusTopicSource, FilterableTopicSource { - - /** - * Not to be converted to PolicyLogger. - * This will contain all instract /out traffic and only that in a single file in a concise format. - */ - private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class); - private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER); - - /** - * Bus consumer group - */ - protected final String consumerGroup; - - /** - * Bus consumer instance - */ - protected final String consumerInstance; - - /** - * Bus fetch timeout - */ - protected final int fetchTimeout; - - /** - * Bus fetch limit - */ - protected final int fetchLimit; - - /** - * Message Bus Consumer - */ - protected BusConsumer consumer; - - /** - * Independent thread reading message over my topic - */ - protected Thread busPollerThread; - - - /** - * - * @param servers Bus servers - * @param topic Bus Topic to be monitored - * @param apiKey Bus API Key (optional) - * @param apiSecret Bus API Secret (optional) - * @param consumerGroup Bus Reader Consumer Group - * @param consumerInstance Bus Reader Instance - * @param fetchTimeout Bus fetch timeout - * @param fetchLimit Bus fetch limit - * @param useHttps does the bus use https - * @param allowSelfSignedCerts are self-signed certificates allowed - * @throws IllegalArgumentException An invalid parameter passed in - */ - public SingleThreadedBusTopicSource(List servers, - String topic, - String apiKey, - String apiSecret, - String consumerGroup, - String consumerInstance, - int fetchTimeout, - int fetchLimit, - boolean useHttps, - boolean allowSelfSignedCerts) { - - super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts); - - if (consumerGroup == null || consumerGroup.isEmpty()) { - this.consumerGroup = UUID.randomUUID ().toString(); - } else { - this.consumerGroup = consumerGroup; - } - - if (consumerInstance == null || consumerInstance.isEmpty()) { - this.consumerInstance = NetworkUtil.getHostname(); - } else { - this.consumerInstance = consumerInstance; - } - - if (fetchTimeout <= 0) { - this.fetchTimeout = NO_TIMEOUT_MS_FETCH; - } else { - this.fetchTimeout = fetchTimeout; - } - - if (fetchLimit <= 0) { - this.fetchLimit = NO_LIMIT_FETCH; - } else { - this.fetchLimit = fetchLimit; - } - - } - - /** - * Initialize the Bus client - */ - public abstract void init() throws MalformedURLException; - - @Override - public void register(TopicListener topicListener) { - - super.register(topicListener); - - try { - if (!alive && !locked) - this.start(); - else - logger.info("{}: register: start not attempted", this); - } catch (Exception e) { - logger.warn("{}: cannot start after registration of because of: {}", - this, topicListener, e.getMessage(), e); - } - } - - @Override - public void unregister(TopicListener topicListener) { - boolean stop; - synchronized (this) { - super.unregister(topicListener); - stop = this.topicListeners.isEmpty(); - } - - if (stop) { - this.stop(); - } - } - - @Override - public boolean start() { - logger.info("{}: starting", this); - - synchronized(this) { - - if (alive) - return true; - - if (locked) - throw new IllegalStateException(this + " is locked."); - - if (this.busPollerThread == null || - !this.busPollerThread.isAlive() || - this.consumer == null) { - - try { - this.init(); - this.alive = true; - this.busPollerThread = new Thread(this); - this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic()); - busPollerThread.start(); - } catch (Exception e) { - logger.warn("{}: cannot start because of {}", this, e.getMessage(), e); - throw new IllegalStateException(e); - } - } - } - - return this.alive; - } - - @Override - public boolean stop() { - logger.info("{}: stopping", this); - - synchronized(this) { - BusConsumer consumerCopy = this.consumer; - - this.alive = false; - this.consumer = null; - - if (consumerCopy != null) { - try { - consumerCopy.close(); - } catch (Exception e) { - logger.warn("{}: stop failed because of {}", this, e.getMessage(), e); - } - } - } - - Thread.yield(); - - return true; - } - - /** - * Run thread method for the Bus Reader - */ - @Override - public void run() { - while (this.alive) { - try { - for (String event: this.consumer.fetch()) { - synchronized (this) { - this.recentEvents.add(event); - } - - netLogger.info("[IN|{}|{}]{}{}", - this.getTopicCommInfrastructure(), this.topic, - System.lineSeparator(), event); - - broadcast(event); - - if (!this.alive) - break; - } - } catch (Exception e) { - logger.error("{}: cannot fetch because of ", this, e.getMessage(), e); - } - } - - logger.info("{}: exiting thread", this); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean offer(String event) { - if (!this.alive) { - throw new IllegalStateException(this + " is not alive."); - } - - synchronized (this) { - this.recentEvents.add(event); - } - - netLogger.info("[IN|{}|{}]{}{}",this.getTopicCommInfrastructure(),this.topic, - System.lineSeparator(), event); - - - return broadcast(event); - } - - - @Override - public void setFilter(String filter) { - if(consumer instanceof FilterableBusConsumer) { - ((FilterableBusConsumer) consumer).setFilter(filter); - - } else { - throw new UnsupportedOperationException("no server-side filtering for topic " + topic); - } - } - - @Override - public String toString() { - return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance - + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" - + this.consumer + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread - + ", topicListeners=" + topicListeners.size() + ", toString()=" + super.toString() + "]"; - } - - /** - * {@inheritDoc} - */ - @Override - public String getConsumerGroup() { - return consumerGroup; - } - - /** - * {@inheritDoc} - */ - @Override - public String getConsumerInstance() { - return consumerInstance; - } - - /** - * {@inheritDoc} - */ - @Override - public void shutdown() { - this.stop(); - this.topicListeners.clear(); - } - - /** - * {@inheritDoc} - */ - @Override - public int getFetchTimeout() { - return fetchTimeout; - } - - /** - * {@inheritDoc} - */ - @Override - public int getFetchLimit() { - return fetchLimit; - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java deleted file mode 100644 index 6a9a2d6d..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ /dev/null @@ -1,197 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus.internal; - -import java.net.MalformedURLException; -import java.util.List; -import java.util.Map; - -import org.onap.policy.drools.event.comm.Topic; -import org.onap.policy.drools.event.comm.bus.DmaapTopicSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This topic reader implementation specializes in reading messages - * over DMAAP topic and notifying its listeners - */ -public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource - implements DmaapTopicSource, Runnable { - - private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class); - - - protected final String userName; - protected final String password; - - protected String environment = null; - protected String aftEnvironment = null; - protected String partner = null; - protected String latitude = null; - protected String longitude = null; - - protected Map additionalProps = null; - - - /** - * - * @param servers DMaaP servers - * @param topic DMaaP Topic to be monitored - * @param apiKey DMaaP API Key (optional) - * @param apiSecret DMaaP API Secret (optional) - * @param consumerGroup DMaaP Reader Consumer Group - * @param consumerInstance DMaaP Reader Instance - * @param fetchTimeout DMaaP fetch timeout - * @param fetchLimit DMaaP fetch limit - * @param environment DME2 Environment - * @param aftEnvironment DME2 AFT Environment - * @param partner DME2 Partner - * @param latitude DME2 Latitude - * @param longitude DME2 Longitude - * @param additionalProps Additional properties to pass to DME2 - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow - * - * @throws IllegalArgumentException An invalid parameter passed in - */ - public SingleThreadedDmaapTopicSource(List servers, String topic, - String apiKey, String apiSecret, - String userName, String password, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, - String environment, String aftEnvironment, String partner, - String latitude, String longitude, Map additionalProps, - boolean useHttps, boolean allowSelfSignedCerts) { - - super(servers, topic, apiKey, apiSecret, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts); - - this.userName = userName; - this.password = password; - - this.environment = environment; - this.aftEnvironment = aftEnvironment; - this.partner = partner; - - this.latitude = latitude; - this.longitude = longitude; - - this.additionalProps = additionalProps; - try { - this.init(); - } catch (Exception e) { - logger.error("ERROR during init of topic {}", this.topic); - throw new IllegalArgumentException(e); - } - } - - /** - * - * @param servers DMaaP servers - * @param topic DMaaP Topic to be monitored - * @param apiKey DMaaP API Key (optional) - * @param apiSecret DMaaP API Secret (optional) - * @param consumerGroup DMaaP Reader Consumer Group - * @param consumerInstance DMaaP Reader Instance - * @param fetchTimeout DMaaP fetch timeout - * @param fetchLimit DMaaP fetch limit - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow - * @throws IllegalArgumentException An invalid parameter passed in - */ - public SingleThreadedDmaapTopicSource(List servers, String topic, - String apiKey, String apiSecret, - String userName, String password, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) { - - - super(servers, topic, apiKey, apiSecret, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); - - this.userName = userName; - this.password = password; - - try { - this.init(); - } catch (Exception e) { - logger.warn("dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e); - throw new IllegalArgumentException(e); - } - } - - - /** - * Initialize the Cambria or MR Client - */ - @Override - public void init() throws MalformedURLException { - if (anyNullOrEmpty(this.userName, this.password)) { - this.consumer = - new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, - this.apiKey, this.apiSecret, - this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit, - this.useHttps, this.allowSelfSignedCerts); - } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.consumer = - new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, - this.apiKey, this.apiSecret, - this.userName, this.password, - this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit, - this.useHttps, this.allowSelfSignedCerts); - } else { - this.consumer = - new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, - this.apiKey, this.apiSecret, - this.userName, this.password, - this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit, - this.environment, this.aftEnvironment, this.partner, - this.latitude, this.longitude, this.additionalProps, this.useHttps); - } - - logger.info("{}: INITTED", this); - } - - /** - * {@inheritDoc} - */ - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.DMAAP; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=") - .append((password == null || password.isEmpty()) ? "-" : password.length()) - .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()) - .append(", toString()=").append(super.toString()).append("]"); - return builder.toString(); - } - - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java deleted file mode 100644 index fcbee631..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus.internal; - -import java.util.List; - -import org.onap.policy.drools.event.comm.Topic; -import org.onap.policy.drools.event.comm.bus.UebTopicSource; - -/** - * This topic source implementation specializes in reading messages - * over an UEB Bus topic source and notifying its listeners - */ -public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource - implements UebTopicSource { - - /** - * - * @param servers UEB servers - * @param topic UEB Topic to be monitored - * @param apiKey UEB API Key (optional) - * @param apiSecret UEB API Secret (optional) - * @param consumerGroup UEB Reader Consumer Group - * @param consumerInstance UEB Reader Instance - * @param fetchTimeout UEB fetch timeout - * @param fetchLimit UEB fetch limit - * @param useHttps does topicSource use HTTPS? - * @param allowSelfSignedCerts does topicSource allow self-signed certs? - * - * @throws IllegalArgumentException An invalid parameter passed in - */ - - - public SingleThreadedUebTopicSource(List servers, String topic, - String apiKey, String apiSecret, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) { - - super(servers, topic, apiKey, apiSecret, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); - - this.allowSelfSignedCerts = allowSelfSignedCerts; - - this.init(); - } - - /** - * Initialize the Cambria client - */ - @Override - public void init() { - this.consumer = - new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, - this.apiKey, this.apiSecret, - this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts); - } - - /** - * {@inheritDoc} - */ - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.UEB; - } - - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("SingleThreadedUebTopicSource [getTopicCommInfrastructure()=") - .append(getTopicCommInfrastructure()).append(", toString()=").append(super.toString()).append("]"); - return builder.toString(); - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java deleted file mode 100644 index 22c6b1d5..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.event.comm.bus.internal; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.collections4.queue.CircularFifoQueue; -import org.onap.policy.drools.event.comm.Topic; -import org.onap.policy.drools.event.comm.TopicListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class TopicBase implements Topic { - - /** - * logger - */ - private static Logger logger = LoggerFactory.getLogger(TopicBase.class); - - /** - * list of servers - */ - protected List servers; - - /** - * Topic - */ - protected String topic; - - /** - * event cache - */ - protected CircularFifoQueue recentEvents = new CircularFifoQueue<>(10); - - /** - * Am I running? - * reflects invocation of start()/stop() - * !locked & start() => alive - * stop() => !alive - */ - protected volatile boolean alive = false; - - /** - * Am I locked? - * reflects invocation of lock()/unlock() operations - * locked => !alive (but not in the other direction necessarily) - * locked => !offer, !run, !start, !stop (but this last one is obvious - * since locked => !alive) - */ - protected volatile boolean locked = false; - - /** - * All my subscribers for new message notifications - */ - protected final ArrayList topicListeners = new ArrayList<>(); - - /** - * Instantiates a new Topic Base - * - * @param servers list of servers - * @param topic topic name - * - * @return a Topic Base - * @throws IllegalArgumentException if invalid parameters are present - */ - public TopicBase(List servers, String topic) { - - if (servers == null || servers.isEmpty()) { - throw new IllegalArgumentException("Server(s) must be provided"); - } - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("A Topic must be provided"); - } - - this.servers = servers; - this.topic = topic; - } - - @Override - public void register(TopicListener topicListener) { - - logger.info("{}: registering {}", this, topicListener); - - synchronized(this) { - if (topicListener == null) - throw new IllegalArgumentException("TopicListener must be provided"); - - for (TopicListener listener: this.topicListeners) { - if (listener == topicListener) return; - } - - this.topicListeners.add(topicListener); - } - } - - @Override - public void unregister(TopicListener topicListener) { - - logger.info("{}: unregistering {}", this, topicListener); - - synchronized (this) { - if (topicListener == null) - throw new IllegalArgumentException("TopicListener must be provided"); - - this.topicListeners.remove(topicListener); - } - } - - /** - * broadcast event to all listeners - * - * @param message the event - * @return true if all notifications are performed with no error, false otherwise - */ - protected boolean broadcast(String message) { - List snapshotListeners = this.snapshotTopicListeners(); - - boolean success = true; - for (TopicListener topicListener: snapshotListeners) { - try { - topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message); - } catch (Exception e) { - logger.warn("{}: notification error @ {} because of {}", - this, topicListener, e.getMessage(), e); - success = false; - } - } - return success; - } - - /** - * take a snapshot of current topic listeners - * - * @return the topic listeners - */ - protected synchronized List snapshotTopicListeners() { - @SuppressWarnings("unchecked") - List listeners = (List) topicListeners.clone(); - return listeners; - } - - @Override - public boolean lock() { - - logger.info("{}: locking", this); - - synchronized (this) { - if (this.locked) - return true; - - this.locked = true; - } - - return this.stop(); - } - - @Override - public boolean unlock() { - logger.info("{}: unlocking", this); - - synchronized(this) { - if (!this.locked) - return true; - - this.locked = false; - } - - try { - return this.start(); - } catch (Exception e) { - logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e); - return false; - } - } - - @Override - public boolean isLocked() { - return this.locked; - } - - @Override - public String getTopic() { - return topic; - } - - @Override - public boolean isAlive() { - return this.alive; - } - - @Override - public List getServers() { - return servers; - } - - @Override - public synchronized String[] getRecentEvents() { - String[] events = new String[recentEvents.size()]; - return recentEvents.toArray(events); - } - - - @Override - public String toString() { - return "TopicBase [servers=" + servers + ", topic=" + topic + ", #recentEvents=" + recentEvents.size() + ", locked=" - + locked + ", #topicListeners=" + topicListeners.size() + "]"; - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClient.java b/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClient.java deleted file mode 100644 index e5becdf4..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClient.java +++ /dev/null @@ -1,48 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.policy.drools.http.client; - -import javax.ws.rs.core.Response; - -import org.onap.policy.drools.properties.Startable; - -public interface HttpClient extends Startable { - - public Response get(String path); - - public Response get(); - - public static T getBody(Response response, Class entityType) { - return response.readEntity(entityType); - } - - public String getName(); - public boolean isHttps(); - public boolean isSelfSignedCerts(); - public String getHostname(); - public int getPort(); - public String getBasePath(); - public String getUserName(); - public String getPassword(); - public String getBaseUrl(); - - - public static final HttpClientFactory factory = new IndexedHttpClientFactory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClientFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClientFactory.java deleted file mode 100644 index 1094a2fb..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClientFactory.java +++ /dev/null @@ -1,221 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.policy.drools.http.client; - -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; - -import org.onap.policy.drools.http.client.internal.JerseyClient; -import org.onap.policy.drools.properties.PolicyProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Http Client Factory - */ -public interface HttpClientFactory { - - /** - * build and http client with the following parameters - */ - public HttpClient build(String name, boolean https, - boolean selfSignedCerts, - String hostname, int port, - String baseUrl, String userName, - String password, boolean managed) - throws KeyManagementException, NoSuchAlgorithmException; - - /** - * build http client from properties - */ - public List build(Properties properties) - throws KeyManagementException, NoSuchAlgorithmException; - - /** - * get http client - * @param name the name - * @return the http client - */ - public HttpClient get(String name); - - /** - * list of http clients - * @return http clients - */ - public List inventory(); - - /** - * destroy by name - * @param name name - */ - public void destroy(String name); - - public void destroy(); -} - -/** - * http client factory implementation indexed by name - */ -class IndexedHttpClientFactory implements HttpClientFactory { - - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(IndexedHttpClientFactory.class); - - protected HashMap clients = new HashMap<>(); - - @Override - public synchronized HttpClient build(String name, boolean https, boolean selfSignedCerts, - String hostname, int port, - String baseUrl, String userName, String password, - boolean managed) - throws KeyManagementException, NoSuchAlgorithmException { - if (clients.containsKey(name)) - return clients.get(name); - - JerseyClient client = - new JerseyClient(name, https, selfSignedCerts, hostname, port, baseUrl, userName, password); - - if (managed) - clients.put(name, client); - - return client; - } - - @Override - public synchronized List build(Properties properties) - throws KeyManagementException, NoSuchAlgorithmException { - ArrayList clientList = new ArrayList<>(); - - String clientNames = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES); - if (clientNames == null || clientNames.isEmpty()) { - return clientList; - } - - List clientNameList = - new ArrayList<>(Arrays.asList(clientNames.split("\\s*,\\s*"))); - - for (String clientName : clientNameList) { - String httpsString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + - clientName + - PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - boolean https = false; - if (httpsString != null && !httpsString.isEmpty()) { - https = Boolean.parseBoolean(httpsString); - } - - String hostName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + - clientName + - PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX); - - String servicePortString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + - clientName + - PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX); - int port; - try { - if (servicePortString == null || servicePortString.isEmpty()) { - continue; - } - port = Integer.parseInt(servicePortString); - } catch (NumberFormatException nfe) { - logger.error("http-client-factory: cannot parse port {}", servicePortString, nfe); - continue; - } - - String baseUrl = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + - clientName + - PolicyProperties.PROPERTY_HTTP_URL_SUFFIX); - - String userName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + - clientName + - PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX); - - String password = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + - clientName + - PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX); - - String managedString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + - clientName + - PolicyProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - try { - HttpClient client = - this.build(clientName, https, https, hostName, port, baseUrl, - userName, password, managed); - clientList.add(client); - } catch (Exception e) { - logger.error("http-client-factory: cannot build client {}", clientName, e); - } - } - - return clientList; - } - - @Override - public synchronized HttpClient get(String name) { - if (clients.containsKey(name)) { - return clients.get(name); - } - - throw new IllegalArgumentException("Http Client " + name + " not found"); - } - - @Override - public synchronized List inventory() { - return new ArrayList<>(this.clients.values()); - } - - @Override - public synchronized void destroy(String name) { - if (!clients.containsKey(name)) { - return; - } - - HttpClient client = clients.remove(name); - try { - client.shutdown(); - } catch (IllegalStateException e) { - logger.error("http-client-factory: cannot shutdown client {}", client, e); - } - } - - @Override - public void destroy() { - List clientsInventory = this.inventory(); - for (HttpClient client: clientsInventory) { - client.shutdown(); - } - - synchronized(this) { - this.clients.clear(); - } - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/internal/JerseyClient.java b/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/internal/JerseyClient.java deleted file mode 100644 index 6a254e2e..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/internal/JerseyClient.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.http.client.internal; - -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.security.SecureRandom; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.core.Response; -import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; -import org.onap.policy.drools.http.client.HttpClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.annotation.JsonIgnore; - -public class JerseyClient implements HttpClient { - - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(JerseyClient.class); - - protected final String name; - protected final boolean https; - protected final boolean selfSignedCerts; - protected final String hostname; - protected final int port; - protected final String basePath; - protected final String userName; - protected final String password; - - protected final Client client; - protected final String baseUrl; - - protected boolean alive = true; - - - public JerseyClient(String name, boolean https, - boolean selfSignedCerts, - String hostname, int port, - String basePath, String userName, - String password) - throws KeyManagementException, NoSuchAlgorithmException { - - super(); - - if (name == null || name.isEmpty()) - throw new IllegalArgumentException("Name must be provided"); - - if (hostname == null || hostname.isEmpty()) - throw new IllegalArgumentException("Hostname must be provided"); - - if (port <= 0 && port >= 65535) - throw new IllegalArgumentException("Invalid Port provided: " + port); - - this.name = name; - this.https = https; - this.hostname = hostname; - this.port = port; - this.basePath = basePath; - this.userName = userName; - this.password = password; - this.selfSignedCerts = selfSignedCerts; - - StringBuilder tmpBaseUrl = new StringBuilder(); - if (this.https) { - tmpBaseUrl.append("https://"); - ClientBuilder clientBuilder; - SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); - if (this.selfSignedCerts) { - sslContext.init(null, new TrustManager[]{new X509TrustManager() { - @Override - public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { - // always trusted - } - @Override - public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { - // always trusted - } - @Override - public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } - - }}, new SecureRandom()); - clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext).hostnameVerifier((host,session) -> true); - } else { - sslContext.init(null, null, null); - clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext); - } - this.client = clientBuilder.build(); - } else { - tmpBaseUrl.append("http://"); - this.client = ClientBuilder.newClient(); - } - - if (this.userName != null && !this.userName.isEmpty() && - this.password != null && !this.password.isEmpty()) { - HttpAuthenticationFeature authFeature = HttpAuthenticationFeature.basic(userName, password); - this.client.register(authFeature); - } - - this.baseUrl = tmpBaseUrl.append(this.hostname).append(":"). - append(this.port).append("/"). - append((this.basePath == null) ? "" : this.basePath). - toString(); - } - - @Override - public Response get(String path) { - if (path != null && !path.isEmpty()) - return this.client.target(this.baseUrl).path(path).request().get(); - else - return this.client.target(this.baseUrl).request().get(); - } - - @Override - public Response get() { - return this.client.target(this.baseUrl).request().get(); - } - - - @Override - public boolean start() { - return alive; - } - - @Override - public boolean stop() { - return !alive; - } - - @Override - public void shutdown() { - synchronized(this) { - alive = false; - } - - try { - this.client.close(); - } catch (Exception e) { - logger.warn("{}: cannot close because of {}", this, - e.getMessage(), e); - } - } - - @Override - public synchronized boolean isAlive() { - return this.alive; - } - - @Override - public String getName() { - return name; - } - - @Override - public boolean isHttps() { - return https; - } - - @Override - public boolean isSelfSignedCerts() { - return selfSignedCerts; - } - - @Override - public String getHostname() { - return hostname; - } - - @Override - public int getPort() { - return port; - } - - @Override - public String getBasePath() { - return basePath; - } - - @Override - public String getUserName() { - return userName; - } - - @JsonIgnore - @Override - public String getPassword() { - return password; - } - - @Override - public String getBaseUrl() { - return baseUrl; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("JerseyClient [name="); - builder.append(name); - builder.append(", https="); - builder.append(https); - builder.append(", selfSignedCerts="); - builder.append(selfSignedCerts); - builder.append(", hostname="); - builder.append(hostname); - builder.append(", port="); - builder.append(port); - builder.append(", basePath="); - builder.append(basePath); - builder.append(", userName="); - builder.append(userName); - builder.append(", password="); - builder.append(password); - builder.append(", client="); - builder.append(client); - builder.append(", baseUrl="); - builder.append(baseUrl); - builder.append(", alive="); - builder.append(alive); - builder.append("]"); - return builder.toString(); - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServer.java deleted file mode 100644 index 3cd702ae..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServer.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.policy.drools.http.server; - -import org.onap.policy.drools.properties.Startable; - -/** - * A Jetty Server to server REST Requests - */ -public interface HttpServletServer extends Startable { - - - /** - * factory for managing and tracking DMAAP sources - */ - public static HttpServletServerFactory factory = new IndexedHttpServletServerFactory(); - - /** - * - * @return port - */ - public int getPort(); - - /** - * enables basic authentication with user and password on the the relative path relativeUriPath - * - * @param user - * @param password - * @param relativeUriPath - */ - public void setBasicAuthentication(String user, String password, String relativeUriPath); - - /** - * adds a JAX-RS servlet class to serve REST requests - * - * @param servletPath servlet path - * @param restClass JAX-RS API Class - * - * @throws IllegalArgumentException unable to process because of invalid input - * @throws IllegalStateException unable to process because of invalid state - */ - public void addServletClass(String servletPath, String restClass); - - /** - * adds a package containing JAX-RS classes to serve REST requests - * - * @param servletPath servlet path - * @param restPackage JAX-RS package to scan - * - * @throws IllegalArgumentException unable to process because of invalid input - * @throws IllegalStateException unable to process because of invalid state - */ - public void addServletPackage(String servletPath, String restPackage); - - /** - * blocking start of the http server - * - * @param maxWaitTime max time to wait for the start to take place - * @return true if start was successful - * - * @throws IllegalArgumentException if arguments are invalid - * @throws InterruptedException if the blocking operation is interrupted - */ - public boolean waitedStart(long maxWaitTime) throws InterruptedException; -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServerFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServerFactory.java deleted file mode 100644 index f4dc85bc..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServerFactory.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.policy.drools.http.server; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; - -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; -import org.onap.policy.drools.http.server.internal.JettyJerseyServer; -import org.onap.policy.drools.properties.PolicyProperties; - -/** - * Factory of HTTP Servlet-Enabled Servlets - */ -public interface HttpServletServerFactory { - - /** - * builds an http server with support for servlets - * - * @param name name - * @param host binding host - * @param port port - * @param contextPath server base path - * @param swagger enable swagger documentation - * @param managed is it managed by infrastructure - * @return http server - * @throws IllegalArgumentException when invalid parameters are provided - */ - public HttpServletServer build(String name, String host, int port, String contextPath, - boolean swagger, boolean managed); - - /** - * list of http servers per properties - * - * @param properties properties based configuration - * @return list of http servers - * @throws IllegalArgumentException when invalid parameters are provided - */ - public List build(Properties properties); - - /** - * gets a server based on the port - * - * @param port port - * @return http server - */ - public HttpServletServer get(int port); - - /** - * provides an inventory of servers - * - * @return inventory of servers - */ - public List inventory(); - - /** - * destroys server bound to a port - * @param port - */ - public void destroy(int port); - - /** - * destroys the factory and therefore all servers - */ - public void destroy(); -} - -/** - * Indexed factory implementation - */ -class IndexedHttpServletServerFactory implements HttpServletServerFactory { - - private static final String SPACES_COMMA_SPACES = "\\s*,\\s*"; - - /** - * logger - */ - protected static Logger logger = LoggerFactory.getLogger(IndexedHttpServletServerFactory.class); - - /** - * servers index - */ - protected HashMap servers = new HashMap<>(); - - @Override - public synchronized HttpServletServer build(String name, String host, int port, - String contextPath, boolean swagger, - boolean managed) { - - if (servers.containsKey(port)) - return servers.get(port); - - JettyJerseyServer server = new JettyJerseyServer(name, host, port, contextPath, swagger); - if (managed) - servers.put(port, server); - - return server; - } - - @Override - public synchronized List build(Properties properties) { - - ArrayList serviceList = new ArrayList<>(); - - String serviceNames = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES); - if (serviceNames == null || serviceNames.isEmpty()) { - logger.warn("No topic for HTTP Service: {}", properties); - return serviceList; - } - - List serviceNameList = Arrays.asList(serviceNames.split(SPACES_COMMA_SPACES)); - - for (String serviceName : serviceNameList) { - String servicePortString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + - serviceName + - PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX); - - int servicePort; - try { - if (servicePortString == null || servicePortString.isEmpty()) { - if (logger.isWarnEnabled()) - logger.warn("No HTTP port for service in {}", serviceName); - continue; - } - servicePort = Integer.parseInt(servicePortString); - } catch (NumberFormatException nfe) { - if (logger.isWarnEnabled()) - logger.warn("No HTTP port for service in {}", serviceName); - continue; - } - - String hostName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + - serviceName + - PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX); - - String contextUriPath = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + - serviceName + - PolicyProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX); - - String userName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + - serviceName + - PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX); - - String password = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + - serviceName + - PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX); - - String authUriPath = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + - serviceName + - PolicyProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX); - - String restClasses = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + - serviceName + - PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX); - - String restPackages = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + - serviceName + - PolicyProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX); - String restUriPath = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + - serviceName + - PolicyProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX); - - String managedString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + - serviceName + - PolicyProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String swaggerString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + - serviceName + - PolicyProperties.PROPERTY_HTTP_SWAGGER_SUFFIX); - boolean swagger = false; - if (swaggerString != null && !swaggerString.isEmpty()) { - swagger = Boolean.parseBoolean(swaggerString); - } - - HttpServletServer service = build(serviceName, hostName, servicePort, contextUriPath, swagger, managed); - if (userName != null && !userName.isEmpty() && password != null && !password.isEmpty()) { - service.setBasicAuthentication(userName, password, authUriPath); - } - - if (restClasses != null && !restClasses.isEmpty()) { - List restClassesList = Arrays.asList(restClasses.split(SPACES_COMMA_SPACES)); - for (String restClass : restClassesList) - service.addServletClass(restUriPath, restClass); - } - - if (restPackages != null && !restPackages.isEmpty()) { - List restPackageList = Arrays.asList(restPackages.split(SPACES_COMMA_SPACES)); - for (String restPackage : restPackageList) - service.addServletPackage(restUriPath, restPackage); - } - - serviceList.add(service); - } - - return serviceList; - } - - @Override - public synchronized HttpServletServer get(int port) { - - if (servers.containsKey(port)) { - return servers.get(port); - } - - throw new IllegalArgumentException("Http Server for " + port + " not found"); - } - - @Override - public synchronized List inventory() { - return new ArrayList<>(this.servers.values()); - } - - @Override - public synchronized void destroy(int port) { - - if (!servers.containsKey(port)) { - return; - } - - HttpServletServer server = servers.remove(port); - server.shutdown(); - } - - @Override - public synchronized void destroy() { - List httpServletServers = this.inventory(); - for (HttpServletServer server: httpServletServers) { - server.shutdown(); - } - - synchronized(this) { - this.servers.clear(); - } - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyJerseyServer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyJerseyServer.java deleted file mode 100644 index 0cbd983d..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyJerseyServer.java +++ /dev/null @@ -1,249 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.policy.drools.http.server.internal; - -import java.util.HashMap; -import org.eclipse.jetty.servlet.ServletHolder; -import org.onap.policy.drools.utils.NetworkUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import io.swagger.jersey.config.JerseyJaxrsConfig; - -/** - * REST Jetty Server that uses Jersey Servlets to support JAX-RS Web Services - */ -public class JettyJerseyServer extends JettyServletServer { - - /** - * Swagger API Base Path - */ - protected static final String SWAGGER_API_BASEPATH = "swagger.api.basepath"; - - /** - * Swagger Context ID - */ - protected static final String SWAGGER_CONTEXT_ID = "swagger.context.id"; - - /** - * Swagger Scanner ID - */ - protected static final String SWAGGER_SCANNER_ID = "swagger.scanner.id"; - - /** - * Swagger Pretty Print - */ - protected static final String SWAGGER_PRETTY_PRINT = "swagger.pretty.print"; - - /** - * Swagger Packages - */ - protected static final String SWAGGER_INIT_PACKAGES_PARAM_VALUE = "io.swagger.jaxrs.listing"; - - /** - * Jersey Packages Init Param Name - */ - protected static final String JERSEY_INIT_PACKAGES_PARAM_NAME = "jersey.config.server.provider.packages"; - - /** - * Jersey Packages Init Param Value - */ - protected static final String JERSEY_INIT_PACKAGES_PARAM_VALUE = "com.fasterxml.jackson.jaxrs.json"; - - /** - * Jersey Classes Init Param Name - */ - protected static final String JERSEY_INIT_CLASSNAMES_PARAM_NAME = "jersey.config.server.provider.classnames"; - - /** - * Jersey Jackson Classes Init Param Value - */ - protected static final String JERSEY_JACKSON_INIT_CLASSNAMES_PARAM_VALUE = "com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider"; - - /** - * Jersey Swagger Classes Init Param Value - */ - protected static final String SWAGGER_INIT_CLASSNAMES_PARAM_VALUE = "io.swagger.jaxrs.listing.ApiListingResource," + - "io.swagger.jaxrs.listing.SwaggerSerializers"; - /** - * Logger - */ - protected static Logger logger = LoggerFactory.getLogger(JettyJerseyServer.class); - - /** - * Container for servlets - */ - protected HashMap servlets = new HashMap<>(); - - /** - * Swagger ID - */ - protected String swaggerId = null; - - /** - * Constructor - * - * @param name name - * @param host host server host - * @param port port server port - * @param swagger support swagger? - * @param contextPath context path - * - * @throws IllegalArgumentException in invalid arguments are provided - */ - public JettyJerseyServer(String name, String host, int port, String contextPath, boolean swagger) { - - super(name, host, port, contextPath); - if (swagger) { - this.swaggerId = "swagger-" + this.port; - attachSwaggerServlet(); - } - } - - /** - * attaches a swagger initialization servlet - */ - protected void attachSwaggerServlet() { - - ServletHolder swaggerServlet = context.addServlet(JerseyJaxrsConfig.class, "/"); - - String hostname = this.connector.getHost(); - if (hostname == null || hostname.isEmpty() || hostname.equals(NetworkUtil.IPv4_WILDCARD_ADDRESS)) { - hostname = NetworkUtil.getHostname(); - } - - swaggerServlet.setInitParameter(SWAGGER_API_BASEPATH, - "http://" + hostname + ":" + this.connector.getPort() + "/"); - swaggerServlet.setInitParameter(SWAGGER_CONTEXT_ID, swaggerId); - swaggerServlet.setInitParameter(SWAGGER_SCANNER_ID, swaggerId); - swaggerServlet.setInitParameter(SWAGGER_PRETTY_PRINT, "true"); - swaggerServlet.setInitOrder(2); - - if (logger.isDebugEnabled()) - logger.debug("{}: Swagger Servlet has been attached: {}", this, swaggerServlet.dump()); - } - - /** - * retrieves cached server based on servlet path - * - * @param servletPath servlet path - * @return the jetty servlet holder - * - * @throws IllegalArgumentException if invalid arguments are provided - */ - protected synchronized ServletHolder getServlet(String servletPath) { - - ServletHolder jerseyServlet = servlets.get(servletPath); - if (jerseyServlet == null) { - jerseyServlet = context.addServlet - (org.glassfish.jersey.servlet.ServletContainer.class, servletPath); - jerseyServlet.setInitOrder(0); - servlets.put(servletPath, jerseyServlet); - } - - return jerseyServlet; - } - - @Override - public synchronized void addServletPackage(String servletPath, String restPackage) { - String servPath = servletPath; - if (restPackage == null || restPackage.isEmpty()) - throw new IllegalArgumentException("No discoverable REST package provided"); - - if (servPath == null || servPath.isEmpty()) - servPath = "/*"; - - ServletHolder jerseyServlet = this.getServlet(servPath); - - String initClasses = - jerseyServlet.getInitParameter(JERSEY_INIT_CLASSNAMES_PARAM_NAME); - if (initClasses != null && !initClasses.isEmpty()) - logger.warn("Both packages and classes are used in Jetty+Jersey Configuration: {}", restPackage); - - String initPackages = - jerseyServlet.getInitParameter(JERSEY_INIT_PACKAGES_PARAM_NAME); - if (initPackages == null) { - if (this.swaggerId != null) { - initPackages = JERSEY_INIT_PACKAGES_PARAM_VALUE + "," + - SWAGGER_INIT_PACKAGES_PARAM_VALUE + "," + - restPackage; - - jerseyServlet.setInitParameter(SWAGGER_CONTEXT_ID, swaggerId); - jerseyServlet.setInitParameter(SWAGGER_SCANNER_ID, swaggerId); - } else { - initPackages = JERSEY_INIT_PACKAGES_PARAM_VALUE + "," + - restPackage; - } - } else { - initPackages = initPackages + "," + restPackage; - } - - jerseyServlet.setInitParameter(JERSEY_INIT_PACKAGES_PARAM_NAME, initPackages); - - if (logger.isDebugEnabled()) - logger.debug("{}: added REST package: {}", this, jerseyServlet.dump()); - } - - @Override - public synchronized void addServletClass(String servletPath, String restClass) { - - if (restClass == null || restClass.isEmpty()) - throw new IllegalArgumentException("No discoverable REST class provided"); - - if (servletPath == null || servletPath.isEmpty()) - servletPath = "/*"; - - ServletHolder jerseyServlet = this.getServlet(servletPath); - - String initPackages = - jerseyServlet.getInitParameter(JERSEY_INIT_PACKAGES_PARAM_NAME); - if (initPackages != null && !initPackages.isEmpty()) - logger.warn("Both classes and packages are used in Jetty+Jersey Configuration: {}", restClass); - - String initClasses = - jerseyServlet.getInitParameter(JERSEY_INIT_CLASSNAMES_PARAM_NAME); - if (initClasses == null) { - if (this.swaggerId != null) { - initClasses = JERSEY_JACKSON_INIT_CLASSNAMES_PARAM_VALUE + "," + - SWAGGER_INIT_CLASSNAMES_PARAM_VALUE + "," + - restClass; - - jerseyServlet.setInitParameter(SWAGGER_CONTEXT_ID, swaggerId); - jerseyServlet.setInitParameter(SWAGGER_SCANNER_ID, swaggerId); - } else { - initClasses = JERSEY_JACKSON_INIT_CLASSNAMES_PARAM_VALUE + "," + restClass; - } - } else { - initClasses = initClasses + "," + restClass; - } - - jerseyServlet.setInitParameter(JERSEY_INIT_CLASSNAMES_PARAM_NAME, initClasses); - - if (logger.isDebugEnabled()) - logger.debug("{}: added REST class: {}", this, jerseyServlet.dump()); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("JettyJerseyServer [servlets=").append(servlets).append(", swaggerId=").append(swaggerId) - .append(", toString()=").append(super.toString()).append("]"); - return builder.toString(); - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyServletServer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyServletServer.java deleted file mode 100644 index 08c62445..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyServletServer.java +++ /dev/null @@ -1,388 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.policy.drools.http.server.internal; - -import org.eclipse.jetty.security.ConstraintMapping; -import org.eclipse.jetty.security.ConstraintSecurityHandler; -import org.eclipse.jetty.security.HashLoginService; -import org.eclipse.jetty.security.authentication.BasicAuthenticator; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.server.Slf4jRequestLog; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.util.security.Constraint; -import org.eclipse.jetty.util.security.Credential; -import org.onap.policy.drools.http.server.HttpServletServer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.annotation.JsonIgnore; - -/** - * Http Server implementation using Embedded Jetty - */ -public abstract class JettyServletServer implements HttpServletServer, Runnable { - - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(JettyServletServer.class); - - /** - * server name - */ - protected final String name; - - /** - * server host address - */ - protected final String host; - - /** - * server port to bind - */ - protected final int port; - - /** - * server auth user name - */ - protected String user; - - /** - * server auth password name - */ - protected String password; - - /** - * server base context path - */ - protected final String contextPath; - - /** - * embedded jetty server - */ - protected final Server jettyServer; - - /** - * servlet context - */ - protected final ServletContextHandler context; - - /** - * jetty connector - */ - protected final ServerConnector connector; - - /** - * jetty thread - */ - protected volatile Thread jettyThread; - - /** - * start condition - */ - protected Object startCondition = new Object(); - - /** - * constructor - * - * @param name server name - * @param host server host - * @param port server port - * @param contextPath context path - * - * @throws IllegalArgumentException if invalid parameters are passed in - */ - public JettyServletServer(String name, String host, int port, String contextPath) { - String srvName = name; - String srvHost = host; - String ctxtPath = contextPath; - - if (srvName == null || srvName.isEmpty()) - srvName = "http-" + port; - - if (port <= 0 && port >= 65535) - throw new IllegalArgumentException("Invalid Port provided: " + port); - - if (srvHost == null || srvHost.isEmpty()) - srvHost = "localhost"; - - if (ctxtPath == null || ctxtPath.isEmpty()) - ctxtPath = "/"; - - this.name = srvName; - - this.host = srvHost; - this.port = port; - - this.contextPath = ctxtPath; - - this.context = new ServletContextHandler(ServletContextHandler.SESSIONS); - this.context.setContextPath(ctxtPath); - - this.jettyServer = new Server(); - this.jettyServer.setRequestLog(new Slf4jRequestLog()); - - this.connector = new ServerConnector(this.jettyServer); - this.connector.setName(srvName); - this.connector.setReuseAddress(true); - this.connector.setPort(port); - this.connector.setHost(srvHost); - - this.jettyServer.addConnector(this.connector); - this.jettyServer.setHandler(context); - } - - @Override - public void setBasicAuthentication(String user, String password, String servletPath) { - String srvltPath = servletPath; - - if (user == null || user.isEmpty() || password == null || password.isEmpty()) - throw new IllegalArgumentException("Missing user and/or password"); - - if (srvltPath == null || srvltPath.isEmpty()) - srvltPath = "/*"; - - HashLoginService hashLoginService = new HashLoginService(); - hashLoginService.putUser(user, - Credential.getCredential(password), - new String[] {"user"}); - hashLoginService.setName(this.connector.getName() + "-login-service"); - - Constraint constraint = new Constraint(); - constraint.setName(Constraint.__BASIC_AUTH); - constraint.setRoles(new String[]{"user"}); - constraint.setAuthenticate(true); - - ConstraintMapping constraintMapping = new ConstraintMapping(); - constraintMapping.setConstraint(constraint); - constraintMapping.setPathSpec(srvltPath); - - ConstraintSecurityHandler securityHandler = new ConstraintSecurityHandler(); - securityHandler.setAuthenticator(new BasicAuthenticator()); - securityHandler.setRealmName(this.connector.getName() + "-realm"); - securityHandler.addConstraintMapping(constraintMapping); - securityHandler.setLoginService(hashLoginService); - - this.context.setSecurityHandler(securityHandler); - - this.user = user; - this.password = password; - } - - /** - * jetty server execution - */ - @Override - public void run() { - try { - logger.info("{}: STARTING", this); - - this.jettyServer.start(); - - if (logger.isInfoEnabled()) - logger.info("{}: STARTED: {}", this, this.jettyServer.dump()); - - synchronized(this.startCondition) { - this.startCondition.notifyAll(); - } - - this.jettyServer.join(); - } catch (Exception e) { - logger.error("{}: error found while bringing up server", this, e); - } - } - - @Override - public boolean waitedStart(long maxWaitTime) throws InterruptedException { - logger.info("{}: WAITED-START", this); - - if (maxWaitTime < 0) - throw new IllegalArgumentException("max-wait-time cannot be negative"); - - long pendingWaitTime = maxWaitTime; - - if (!this.start()) - return false; - - synchronized (this.startCondition) { - - while (!this.jettyServer.isRunning()) { - try { - long startTs = System.currentTimeMillis(); - - this.startCondition.wait(pendingWaitTime); - - if (maxWaitTime == 0) - /* spurious notification */ - continue; - - long endTs = System.currentTimeMillis(); - pendingWaitTime = pendingWaitTime - (endTs - startTs); - - logger.info("{}: pending time is {} ms.", this, pendingWaitTime); - - if (pendingWaitTime <= 0) - return false; - - } catch (InterruptedException e) { - logger.warn("{}: waited-start has been interrupted", this); - throw e; - } - } - - return this.jettyServer.isRunning(); - } - } - - @Override - public boolean start() { - logger.info("{}: STARTING", this); - - synchronized(this) { - if (jettyThread == null || - !this.jettyThread.isAlive()) { - - this.jettyThread = new Thread(this); - this.jettyThread.setName(this.name + "-" + this.port); - this.jettyThread.start(); - } - } - - return true; - } - - @Override - public boolean stop() { - logger.info("{}: STOPPING", this); - - synchronized(this) { - if (jettyThread == null) { - return true; - } - - if (!jettyThread.isAlive()) { - this.jettyThread = null; - } - - try { - this.connector.stop(); - } catch (Exception e) { - logger.error("{}: error while stopping management server", this, e); - } - - try { - this.jettyServer.stop(); - } catch (Exception e) { - logger.error("{}: error while stopping management server", this, e); - return false; - } - - Thread.yield(); - } - - return true; - } - - @Override - public void shutdown() { - logger.info("{}: SHUTTING DOWN", this); - - this.stop(); - - if (this.jettyThread == null) - return; - - Thread jettyThreadCopy = this.jettyThread; - - if (jettyThreadCopy.isAlive()) { - try { - jettyThreadCopy.join(2000L); - } catch (InterruptedException e) { - logger.warn("{}: error while shutting down management server", this); - Thread.currentThread().interrupt(); - } - if (!jettyThreadCopy.isInterrupted()) { - try { - jettyThreadCopy.interrupt(); - } catch(Exception e) { - // do nothing - logger.warn("{}: exception while shutting down (OK)", this, e); - } - } - } - - this.jettyServer.destroy(); - } - - @Override - public boolean isAlive() { - if (this.jettyThread != null) - return this.jettyThread.isAlive(); - - return false; - } - - @Override - public int getPort() { - return this.port; - } - - /** - * @return the name - */ - public String getName() { - return name; - } - - /** - * @return the host - */ - public String getHost() { - return host; - } - - /** - * @return the user - */ - public String getUser() { - return user; - } - - /** - * @return the password - */ - @JsonIgnore - public String getPassword() { - return password; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("JettyServer [name=").append(name).append(", host=").append(host).append(", port=").append(port) - .append(", user=").append(user).append(", password=").append(password != null).append(", contextPath=") - .append(contextPath).append(", jettyServer=").append(jettyServer).append(", context=").append(this.context) - .append(", connector=").append(connector).append(", jettyThread=").append(jettyThread) - .append("]"); - return builder.toString(); - } - -} -- cgit 1.2.3-korg