summaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main
diff options
context:
space:
mode:
authormmis <michael.morris@ericsson.com>2018-07-19 13:21:08 +0100
committermmis <michael.morris@ericsson.com>2018-07-25 09:14:33 +0100
commit64f53ef14f5a9ea98208fd2b835bfb01fda9a5f9 (patch)
tree60afa251c126287a85c89e0b1dc7fd5d1a766439 /policy-endpoints/src/main
parente8f1b7235f8338fbb9eba28d8cff29d3d6adf6e7 (diff)
Copy policy-endpoints from drools-pdp to common
Removed policy-endpoints, and 3 classes from policy-core. Replaced refenences to the deleted classes with references to the corresponding classes in policy-common Issue-ID: POLICY-967 Change-Id: I547cde4894424b8f40b7ddd4e2342ebb729cb588 Signed-off-by: mmis <michael.morris@ericsson.com>
Diffstat (limited to 'policy-endpoints/src/main')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java39
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/Topic.java86
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java658
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicListener.java38
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicRegisterable.java42
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSink.java40
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSource.java37
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/ApiKeyEnabled.java36
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSink.java47
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSource.java78
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSink.java30
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java465
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSource.java29
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java587
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSink.java126
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java230
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSink.java32
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSinkFactory.java306
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSource.java34
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSourceFactory.java394
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java546
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java391
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java131
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java212
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java144
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java94
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java332
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java197
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java95
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java228
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClient.java48
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClientFactory.java221
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/http/client/internal/JerseyClient.java249
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServer.java82
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServerFactory.java261
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyJerseyServer.java249
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyServletServer.java388
37 files changed, 0 insertions, 7202 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java
deleted file mode 100644
index b1e0e1c2..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm;
-
-/**
- * TopicSource that supports server-side filtering.
- */
-public interface FilterableTopicSource extends TopicSource {
-
- /**
- * Sets the server-side filter.
- *
- * @param filter new filter value, or {@code null}
- * @throws UnsupportedOperationException if the consumer does not support
- * server-side filtering
- * @throws IllegalArgumentException if the consumer cannot be built with the
- * new filter
- */
- public void setFilter(String filter);
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/Topic.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/Topic.java
deleted file mode 100644
index 30174f1f..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/Topic.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm;
-
-import java.util.List;
-
-import org.onap.policy.drools.properties.Lockable;
-import org.onap.policy.drools.properties.Startable;
-
-/**
- * Essential Topic Data
- */
-public interface Topic extends TopicRegisterable, Startable, Lockable {
-
- /**
- * network logger
- */
- public static final String NETWORK_LOGGER = "network";
-
- /**
- * Underlying Communication infrastructure Types
- */
- public enum CommInfrastructure {
- /**
- * UEB Communication Infrastructure
- */
- UEB,
- /**
- * DMAAP Communication Infrastructure
- */
- DMAAP,
- /**
- * NOOP for internal use only
- */
- NOOP,
- /**
- * REST Communication Infrastructure
- */
- REST
- }
-
- /**
- * gets the topic name
- *
- * @return topic name
- */
- public String getTopic();
-
- /**
- * gets the communication infrastructure type
- * @return
- */
- public CommInfrastructure getTopicCommInfrastructure();
-
- /**
- * return list of servers
- * @return bus servers
- */
- public List<String> getServers();
-
- /**
- * get the more recent events in this topic entity
- *
- * @return list of most recent events
- */
- public String[] getRecentEvents();
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java
deleted file mode 100644
index 5c04bb8f..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java
+++ /dev/null
@@ -1,658 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.onap.policy.drools.event.comm.bus.DmaapTopicSink;
-import org.onap.policy.drools.event.comm.bus.DmaapTopicSource;
-import org.onap.policy.drools.event.comm.bus.NoopTopicSink;
-import org.onap.policy.drools.event.comm.bus.UebTopicSink;
-import org.onap.policy.drools.event.comm.bus.UebTopicSource;
-import org.onap.policy.drools.properties.Lockable;
-import org.onap.policy.drools.properties.Startable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-/**
- * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into
- * the System.
- */
-public interface TopicEndpoint extends Startable, Lockable {
-
- /**
- * singleton for global access
- */
- public static final TopicEndpoint manager = new ProxyTopicEndpointManager();
-
- /**
- * Add Topic Sources to the communication infrastructure initialized per properties
- *
- * @param properties properties for Topic Source construction
- * @return a generic Topic Source
- * @throws IllegalArgumentException when invalid arguments are provided
- */
- public List<TopicSource> addTopicSources(Properties properties);
-
- /**
- * Add Topic Sinks to the communication infrastructure initialized per properties
- *
- * @param properties properties for Topic Sink construction
- * @return a generic Topic Sink
- * @throws IllegalArgumentException when invalid arguments are provided
- */
- public List<TopicSink> addTopicSinks(Properties properties);
-
- /**
- * gets all Topic Sources
- *
- * @return the Topic Source List
- */
- List<TopicSource> getTopicSources();
-
- /**
- * get the Topic Sources for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the Topic Source List
- * @throws IllegalStateException if the entity is in an invalid state
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public List<TopicSource> getTopicSources(List<String> topicNames);
-
- /**
- * gets the Topic Source for the given topic name and underlying communication infrastructure type
- *
- * @param commType communication infrastructure type
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for example multiple
- * TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- * @throws UnsupportedOperationException if the operation is not supported.
- */
- public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName);
-
- /**
- * get the UEB Topic Source for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the UEB Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for example multiple
- * TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public UebTopicSource getUebTopicSource(String topicName);
-
- /**
- * get the DMAAP Topic Source for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the DMAAP Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for example multiple
- * TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSource getDmaapTopicSource(String topicName);
-
- /**
- * get the Topic Sinks for the given topic name
- *
- * @param topicNames the topic names
- * @return the Topic Sink List
- * @throws IllegalStateException
- * @throws IllegalArgumentException
- */
- public List<TopicSink> getTopicSinks(List<String> topicNames);
-
- /**
- * get the Topic Sinks for the given topic name and underlying communication infrastructure type
- *
- * @param topicName the topic name
- * @param commType communication infrastructure type
- *
- * @return the Topic Sink List
- * @throws IllegalStateException if the entity is in an invalid state, for example multiple
- * TopicWriters for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName);
-
- /**
- * get the Topic Sinks for the given topic name and all the underlying communication
- * infrastructure type
- *
- * @param topicName the topic name
- * @param commType communication infrastructure type
- *
- * @return the Topic Sink List
- * @throws IllegalStateException if the entity is in an invalid state, for example multiple
- * TopicWriters for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public List<TopicSink> getTopicSinks(String topicName);
-
- /**
- * get the UEB Topic Source for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for example multiple
- * TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public UebTopicSink getUebTopicSink(String topicName);
-
- /**
- * get the no-op Topic Sink for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for example multiple
- * TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public NoopTopicSink getNoopTopicSink(String topicName);
-
- /**
- * get the DMAAP Topic Source for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for example multiple
- * TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSink getDmaapTopicSink(String topicName);
-
- /**
- * gets only the UEB Topic Sources
- *
- * @return the UEB Topic Source List
- */
- public List<UebTopicSource> getUebTopicSources();
-
- /**
- * gets only the DMAAP Topic Sources
- *
- * @return the DMAAP Topic Source List
- */
- public List<DmaapTopicSource> getDmaapTopicSources();
-
- /**
- * gets all Topic Sinks
- *
- * @return the Topic Sink List
- */
- public List<TopicSink> getTopicSinks();
-
- /**
- * gets only the UEB Topic Sinks
- *
- * @return the UEB Topic Sink List
- */
- public List<UebTopicSink> getUebTopicSinks();
-
- /**
- * gets only the DMAAP Topic Sinks
- *
- * @return the DMAAP Topic Sink List
- */
- public List<DmaapTopicSink> getDmaapTopicSinks();
-
- /**
- * gets only the NOOP Topic Sinks
- *
- * @return the NOOP Topic Sinks List
- */
- public List<NoopTopicSink> getNoopTopicSinks();
-}
-
-
-/*
- * ----------------- implementation -------------------
- */
-
-/**
- * This implementation of the Topic Endpoint Manager, proxies operations to appropriate
- * implementations according to the communication infrastructure that are supported
- */
-class ProxyTopicEndpointManager implements TopicEndpoint {
- /**
- * Logger
- */
- private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class);
- /**
- * Is this element locked?
- */
- protected volatile boolean locked = false;
-
- /**
- * Is this element alive?
- */
- protected volatile boolean alive = false;
-
- @Override
- public List<TopicSource> addTopicSources(Properties properties) {
-
- // 1. Create UEB Sources
- // 2. Create DMAAP Sources
-
- final List<TopicSource> sources = new ArrayList<>();
-
- sources.addAll(UebTopicSource.factory.build(properties));
- sources.addAll(DmaapTopicSource.factory.build(properties));
-
- if (this.isLocked()) {
- for (final TopicSource source : sources) {
- source.lock();
- }
- }
-
- return sources;
- }
-
- @Override
- public List<TopicSink> addTopicSinks(Properties properties) {
- // 1. Create UEB Sinks
- // 2. Create DMAAP Sinks
-
- final List<TopicSink> sinks = new ArrayList<>();
-
- sinks.addAll(UebTopicSink.factory.build(properties));
- sinks.addAll(DmaapTopicSink.factory.build(properties));
- sinks.addAll(NoopTopicSink.factory.build(properties));
-
- if (this.isLocked()) {
- for (final TopicSink sink : sinks) {
- sink.lock();
- }
- }
-
- return sinks;
- }
-
- @Override
- public List<TopicSource> getTopicSources() {
-
- final List<TopicSource> sources = new ArrayList<>();
-
- sources.addAll(UebTopicSource.factory.inventory());
- sources.addAll(DmaapTopicSource.factory.inventory());
-
- return sources;
- }
-
- @Override
- public List<TopicSink> getTopicSinks() {
-
- final List<TopicSink> sinks = new ArrayList<>();
-
- sinks.addAll(UebTopicSink.factory.inventory());
- sinks.addAll(DmaapTopicSink.factory.inventory());
- sinks.addAll(NoopTopicSink.factory.inventory());
-
- return sinks;
- }
-
- @JsonIgnore
- @Override
- public List<UebTopicSource> getUebTopicSources() {
- return UebTopicSource.factory.inventory();
- }
-
- @JsonIgnore
- @Override
- public List<DmaapTopicSource> getDmaapTopicSources() {
- return DmaapTopicSource.factory.inventory();
- }
-
- @JsonIgnore
- @Override
- public List<UebTopicSink> getUebTopicSinks() {
- return UebTopicSink.factory.inventory();
- }
-
- @JsonIgnore
- @Override
- public List<DmaapTopicSink> getDmaapTopicSinks() {
- return DmaapTopicSink.factory.inventory();
- }
-
- @JsonIgnore
- @Override
- public List<NoopTopicSink> getNoopTopicSinks() {
- return NoopTopicSink.factory.inventory();
- }
-
- @Override
- public boolean start() {
-
- synchronized (this) {
- if (this.locked) {
- throw new IllegalStateException(this + " is locked");
- }
-
- if (this.alive) {
- return true;
- }
-
- this.alive = true;
- }
-
- final List<Startable> endpoints = this.getEndpoints();
-
- boolean success = true;
- for (final Startable endpoint : endpoints) {
- try {
- success = endpoint.start() && success;
- } catch (final Exception e) {
- success = false;
- logger.error("Problem starting endpoint: {}", endpoint, e);
- }
- }
-
- return success;
- }
-
-
- @Override
- public boolean stop() {
-
- /*
- * stop regardless if it is locked, in other words, stop operation has precedence over locks.
- */
- synchronized (this) {
- this.alive = false;
- }
-
- final List<Startable> endpoints = this.getEndpoints();
-
- boolean success = true;
- for (final Startable endpoint : endpoints) {
- try {
- success = endpoint.stop() && success;
- } catch (final Exception e) {
- success = false;
- logger.error("Problem stopping endpoint: {}", endpoint, e);
- }
- }
-
- return success;
- }
-
- /**
- *
- * @return list of managed endpoints
- */
- @JsonIgnore
- protected List<Startable> getEndpoints() {
- final List<Startable> endpoints = new ArrayList<>();
-
- endpoints.addAll(this.getTopicSources());
- endpoints.addAll(this.getTopicSinks());
-
- return endpoints;
- }
-
- @Override
- public void shutdown() {
- UebTopicSource.factory.destroy();
- UebTopicSink.factory.destroy();
- NoopTopicSink.factory.destroy();
-
- DmaapTopicSource.factory.destroy();
- DmaapTopicSink.factory.destroy();
- }
-
- @Override
- public boolean isAlive() {
- return this.alive;
- }
-
- @Override
- public boolean lock() {
-
- synchronized (this) {
- if (this.locked)
- return true;
-
- this.locked = true;
- }
-
- for (final TopicSource source : this.getTopicSources()) {
- source.lock();
- }
-
- for (final TopicSink sink : this.getTopicSinks()) {
- sink.lock();
- }
-
- return true;
- }
-
- @Override
- public boolean unlock() {
- synchronized (this) {
- if (!this.locked)
- return true;
-
- this.locked = false;
- }
-
- for (final TopicSource source : this.getTopicSources()) {
- source.unlock();
- }
-
- for (final TopicSink sink : this.getTopicSinks()) {
- sink.unlock();
- }
-
- return true;
- }
-
- @Override
- public boolean isLocked() {
- return this.locked;
- }
-
- @Override
- public List<TopicSource> getTopicSources(List<String> topicNames) {
-
- if (topicNames == null) {
- throw new IllegalArgumentException("must provide a list of topics");
- }
-
- final List<TopicSource> sources = new ArrayList<>();
- for (final String topic : topicNames) {
- try {
- final TopicSource uebSource = this.getUebTopicSource(topic);
- if (uebSource != null)
- sources.add(uebSource);
- } catch (final Exception e) {
- logger.debug("No UEB source for topic: {}", topic, e);
- }
-
- try {
- final TopicSource dmaapSource = this.getDmaapTopicSource(topic);
- if (dmaapSource != null)
- sources.add(dmaapSource);
- } catch (final Exception e) {
- logger.debug("No DMAAP source for topic: {}", topic, e);
- }
- }
- return sources;
- }
-
- @Override
- public List<TopicSink> getTopicSinks(List<String> topicNames) {
-
- if (topicNames == null) {
- throw new IllegalArgumentException("must provide a list of topics");
- }
-
- final List<TopicSink> sinks = new ArrayList<>();
- for (final String topic : topicNames) {
- try {
- final TopicSink uebSink = this.getUebTopicSink(topic);
- if (uebSink != null)
- sinks.add(uebSink);
- } catch (final Exception e) {
- logger.debug("No UEB sink for topic: {}", topic, e);
- }
-
- try {
- final TopicSink dmaapSink = this.getDmaapTopicSink(topic);
- if (dmaapSink != null)
- sinks.add(dmaapSink);
- } catch (final Exception e) {
- logger.debug("No DMAAP sink for topic: {}", topic, e);
- }
-
- try {
- final TopicSink noopSink = this.getNoopTopicSink(topic);
- if (noopSink != null)
- sinks.add(noopSink);
- } catch (final Exception e) {
- logger.debug("No NOOP sink for topic: {}", topic, e);
- }
- }
- return sinks;
- }
-
- @Override
- public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) {
-
- if (commType == null) {
- throw parmException(topicName);
- }
-
- if (topicName == null) {
- throw parmException(topicName);
- }
-
- switch (commType) {
- case UEB:
- return this.getUebTopicSource(topicName);
- case DMAAP:
- return this.getDmaapTopicSource(topicName);
- default:
- throw new UnsupportedOperationException("Unsupported " + commType.name());
- }
- }
-
- private IllegalArgumentException parmException(String topicName) {
- return new IllegalArgumentException(
- "Invalid parameter: a communication infrastructure required to fetch " + topicName);
- }
-
- @Override
- public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) {
- if (commType == null) {
- throw parmException(topicName);
- }
-
- if (topicName == null) {
- throw parmException(topicName);
- }
-
- switch (commType) {
- case UEB:
- return this.getUebTopicSink(topicName);
- case DMAAP:
- return this.getDmaapTopicSink(topicName);
- case NOOP:
- return this.getNoopTopicSink(topicName);
- default:
- throw new UnsupportedOperationException("Unsupported " + commType.name());
- }
- }
-
- @Override
- public List<TopicSink> getTopicSinks(String topicName) {
- if (topicName == null) {
- throw parmException(topicName);
- }
-
- final List<TopicSink> sinks = new ArrayList<>();
-
- try {
- sinks.add(this.getUebTopicSink(topicName));
- } catch (final Exception e) {
- logNoSink(topicName, e);
- }
-
- try {
- sinks.add(this.getDmaapTopicSink(topicName));
- } catch (final Exception e) {
- logNoSink(topicName, e);
- }
-
- try {
- sinks.add(this.getNoopTopicSink(topicName));
- } catch (final Exception e) {
- logNoSink(topicName, e);
- }
-
- return sinks;
- }
-
-private void logNoSink(String topicName, Exception ex) {
- logger.debug("No sink for topic: {}", topicName, ex);
-}
-
- @Override
- public UebTopicSource getUebTopicSource(String topicName) {
- return UebTopicSource.factory.get(topicName);
- }
-
- @Override
- public UebTopicSink getUebTopicSink(String topicName) {
- return UebTopicSink.factory.get(topicName);
- }
-
- @Override
- public DmaapTopicSource getDmaapTopicSource(String topicName) {
- return DmaapTopicSource.factory.get(topicName);
- }
-
- @Override
- public DmaapTopicSink getDmaapTopicSink(String topicName) {
- return DmaapTopicSink.factory.get(topicName);
- }
-
- @Override
- public NoopTopicSink getNoopTopicSink(String topicName) {
- return NoopTopicSink.factory.get(topicName);
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicListener.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicListener.java
deleted file mode 100644
index 4c8552b6..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicListener.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm;
-
-/**
- * Listener for event messages entering the Policy Engine
- */
-@FunctionalInterface
-public interface TopicListener {
-
- /**
- * Notification of a new Event over a given Topic
- *
- * @param commType communication infrastructure type
- * @param topic topic name
- * @param event event message as a string
- */
- public void onTopicEvent(Topic.CommInfrastructure commType, String topic, String event);
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicRegisterable.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicRegisterable.java
deleted file mode 100644
index 540025e5..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicRegisterable.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm;
-
-/**
- * Marks a Topic entity as registerable
- */
-public interface TopicRegisterable {
-
- /**
- * Register for notification of events with this Topic Entity
- *
- * @param topicListener the listener of events
- */
- public void register(TopicListener topicListener);
-
- /**
- * Unregisters for notification of events with this Topic Entity
- *
- * @param topicListener the listener of events
- */
- public void unregister(TopicListener topicListener);
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSink.java
deleted file mode 100644
index 5ea849ee..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSink.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm;
-
-/**
- * Marks a given Topic Endpoint as able to send messages over a topic
- */
-public interface TopicSink extends Topic {
-
- /**
- * Sends a string message over this Topic Endpoint
- *
- * @param message message to send
- *
- * @return true if the send operation succeeded, false otherwise
- * @throws IllegalArgumentException an invalid message has been provided
- * @throws IllegalStateException the entity is in an state that prevents
- * it from sending messages, for example, locked or stopped.
- */
- public boolean send(String message);
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSource.java
deleted file mode 100644
index 17cde607..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicSource.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm;
-
-/**
- * Marker for a Topic Entity, indicating that the entity is able to read
- * over a topic
- */
-public interface TopicSource extends Topic {
-
- /**
- * pushes an event into the source programatically
- *
- * @param event the event in json format
- * @return true if it can be processed correctly, false otherwise
- */
- public boolean offer(String event);
-
-} \ No newline at end of file
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/ApiKeyEnabled.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/ApiKeyEnabled.java
deleted file mode 100644
index 9ddf4fff..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/ApiKeyEnabled.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus;
-
-/**
- * API
- */
-public interface ApiKeyEnabled {
- /**
- * @return api key
- */
- public String getApiKey();
-
- /**
- * @return api secret
- */
- public String getApiSecret();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSink.java
deleted file mode 100644
index 99a600b0..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSink.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus;
-
-import org.onap.policy.drools.event.comm.TopicSink;
-
-/**
- * Topic Sink over Bus Infrastructure (DMAAP/UEB)
- */
-public interface BusTopicSink extends ApiKeyEnabled, TopicSink {
- /**
- * Log Failures after X number of retries
- */
- public static final int DEFAULT_LOG_SEND_FAILURES_AFTER = 1;
-
- /**
- * Sets the UEB partition key for published messages
- *
- * @param partitionKey the partition key
- */
- public void setPartitionKey(String partitionKey);
-
- /**
- * return the partition key in used by the system to publish messages
- *
- * @return the partition key
- */
- public String getPartitionKey();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSource.java
deleted file mode 100644
index 83d4e72c..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/BusTopicSource.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus;
-
-import org.onap.policy.drools.event.comm.TopicSource;
-
-/**
- * Generic Topic Source for UEB/DMAAP Communication Infrastructure
- *
- */
-public interface BusTopicSource extends ApiKeyEnabled, TopicSource {
-
- /**
- * Default Timeout fetching in milliseconds
- */
- public static int DEFAULT_TIMEOUT_MS_FETCH = 15000;
-
- /**
- * Default maximum number of messages fetch at the time
- */
- public static int DEFAULT_LIMIT_FETCH = 100;
-
- /**
- * Definition of No Timeout fetching
- */
- public static int NO_TIMEOUT_MS_FETCH = -1;
-
- /**
- * Definition of No limit fetching
- */
- public static int NO_LIMIT_FETCH = -1;
-
- /**
- * gets the consumer group
- *
- * @return consumer group
- */
- public String getConsumerGroup();
-
- /**
- * gets the consumer instance
- *
- * @return consumer instance
- */
- public String getConsumerInstance();
-
- /**
- * gets the fetch timeout
- *
- * @return fetch timeout
- */
- public int getFetchTimeout();
-
- /**
- * gets the fetch limit
- *
- * @return fetch limit
- */
- public int getFetchLimit();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSink.java
deleted file mode 100644
index 982fcafa..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSink.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus;
-
-public interface DmaapTopicSink extends BusTopicSink {
-
- /**
- * Factory of UebTopicWriter for instantiation and management purposes
- */
-
- public static final DmaapTopicSinkFactory factory = new IndexedDmaapTopicSinkFactory();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java
deleted file mode 100644
index 5ff5084e..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java
+++ /dev/null
@@ -1,465 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-import org.onap.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink;
-import org.onap.policy.drools.properties.PolicyProperties;
-
-/**
- * DMAAP Topic Sink Factory
- */
-public interface DmaapTopicSinkFactory {
- public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
- public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
- public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
- public final String DME2_VERSION_PROPERTY = "Version";
- public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
- public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
- public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
- public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
-
- /**
- * Instantiates a new DMAAP Topic Sink
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param userName AAF user name
- * @param password AAF password
- * @param partitionKey Consumer Group
- * @param environment DME2 environment
- * @param aftEnvironment DME2 AFT environment
- * @param partner DME2 Partner
- * @param latitude DME2 latitude
- * @param longitude DME2 longitude
- * @param additionalProps additional properties to pass to DME2
- * @param managed is this sink endpoint managed?
- *
- * @return an DMAAP Topic Sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSink build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- String userName,
- String password,
- String partitionKey,
- String environment,
- String aftEnvironment,
- String partner,
- String latitude,
- String longitude,
- Map<String,String> additionalProps,
- boolean managed,
- boolean useHttps,
- boolean allowSelfSignedCerts) ;
-
- /**
- * Instantiates a new DMAAP Topic Sink
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param userName AAF user name
- * @param password AAF password
- * @param partitionKey Consumer Group
- * @param managed is this sink endpoint managed?
- *
- * @return an DMAAP Topic Sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSink build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- String userName,
- String password,
- String partitionKey,
- boolean managed,
- boolean useHttps,
- boolean allowSelfSignedCerts);
-
- /**
- * Creates an DMAAP Topic Sink based on properties files
- *
- * @param properties Properties containing initialization values
- *
- * @return an DMAAP Topic Sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public List<DmaapTopicSink> build(Properties properties);
-
- /**
- * Instantiates a new DMAAP Topic Sink
- *
- * @param servers list of servers
- * @param topic topic name
- *
- * @return an DMAAP Topic Sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSink build(List<String> servers, String topic);
-
- /**
- * Destroys an DMAAP Topic Sink based on a topic
- *
- * @param topic topic name
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public void destroy(String topic);
-
- /**
- * gets an DMAAP Topic Sink based on topic name
- * @param topic the topic name
- *
- * @return an DMAAP Topic Sink with topic name
- * @throws IllegalArgumentException if an invalid topic is provided
- * @throws IllegalStateException if the DMAAP Topic Reader is
- * an incorrect state
- */
- public DmaapTopicSink get(String topic);
-
- /**
- * Provides a snapshot of the DMAAP Topic Sinks
- * @return a list of the DMAAP Topic Sinks
- */
- public List<DmaapTopicSink> inventory();
-
- /**
- * Destroys all DMAAP Topic Sinks
- */
- public void destroy();
-}
-
-/* ------------- implementation ----------------- */
-
-/**
- * Factory of DMAAP Reader Topics indexed by topic name
- */
-class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
- private static final String MISSING_TOPIC = "A topic must be provided";
-
- /**
- * Logger
- */
- private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
-
- /**
- * DMAAP Topic Name Index
- */
- protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
-
- @Override
- public DmaapTopicSink build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- String userName,
- String password,
- String partitionKey,
- String environment,
- String aftEnvironment,
- String partner,
- String latitude,
- String longitude,
- Map<String,String> additionalProps,
- boolean managed,
- boolean useHttps,
- boolean allowSelfSignedCerts) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (dmaapTopicWriters.containsKey(topic)) {
- return dmaapTopicWriters.get(topic);
- }
-
- DmaapTopicSink dmaapTopicSink =
- new InlineDmaapTopicSink(servers, topic,
- apiKey, apiSecret,
- userName, password,
- partitionKey,
- environment, aftEnvironment,
- partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
-
- if (managed)
- dmaapTopicWriters.put(topic, dmaapTopicSink);
- return dmaapTopicSink;
- }
- }
-
- @Override
- public DmaapTopicSink build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- String userName,
- String password,
- String partitionKey,
- boolean managed,
- boolean useHttps, boolean allowSelfSignedCerts) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (dmaapTopicWriters.containsKey(topic)) {
- return dmaapTopicWriters.get(topic);
- }
-
- DmaapTopicSink dmaapTopicSink =
- new InlineDmaapTopicSink(servers, topic,
- apiKey, apiSecret,
- userName, password,
- partitionKey, useHttps, allowSelfSignedCerts);
-
- if (managed)
- dmaapTopicWriters.put(topic, dmaapTopicSink);
- return dmaapTopicSink;
- }
- }
-
- @Override
- public DmaapTopicSink build(List<String> servers, String topic) {
- return this.build(servers, topic, null, null, null, null, null, true, false, false);
- }
-
- @Override
- public List<DmaapTopicSink> build(Properties properties) {
-
- String writeTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS);
- if (writeTopics == null || writeTopics.isEmpty()) {
- logger.info("{}: no topic for DMaaP Sink", this);
- return new ArrayList<>();
- }
-
- List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
- List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
- synchronized(this) {
- for (String topic: writeTopicList) {
- if (this.dmaapTopicWriters.containsKey(topic)) {
- newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
- continue;
- }
- String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." +
- topic +
- PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
-
- List<String> serverList;
- if (servers != null && !servers.isEmpty())
- serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
- else serverList = new ArrayList<>();
-
- String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
- String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
-
- String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
- String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
-
- String partitionKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
-
- String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
- PolicyProperties.PROPERTY_MANAGED_SUFFIX);
-
- /* DME2 Properties */
-
- String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
- + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
-
- String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
- + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
-
- String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
-
- String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
-
- String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
-
- String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
- + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
-
- String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
- + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
-
- String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
- + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
-
- String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
-
- String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
-
- String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
- + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
-
- String dme2SessionStickinessRequired = properties
- .getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
-
- Map<String,String> dme2AdditionalProps = new HashMap<>();
-
- if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty())
- dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
- if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty())
- dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
- if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty())
- dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
- if (dme2Version != null && !dme2Version.isEmpty())
- dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
- if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty())
- dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty())
- dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
- if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty())
- dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
-
- if (servers == null || servers.isEmpty()) {
- logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
- continue;
- }
-
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
-
- String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic +
- PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
-
- //default is to use HTTP if no https property exists
- boolean useHttps = false;
- if (useHttpsString != null && !useHttpsString.isEmpty()){
- useHttps = Boolean.parseBoolean(useHttpsString);
- }
-
-
- String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic +
- PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
-
- //default is to disallow self-signed certs
- boolean allowSelfSignedCerts = false;
- if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
- allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
- }
-
- DmaapTopicSink dmaapTopicSink = this.build(serverList, topic,
- apiKey, apiSecret,
- aafMechId, aafPassword,
- partitionKey,
- dme2Environment, dme2AftEnvironment,
- dme2Partner, dme2Latitude, dme2Longitude,
- dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
-
- newDmaapTopicSinks.add(dmaapTopicSink);
- }
- return newDmaapTopicSinks;
- }
- }
-
- @Override
- public void destroy(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- DmaapTopicSink dmaapTopicWriter;
- synchronized(this) {
- if (!dmaapTopicWriters.containsKey(topic)) {
- return;
- }
-
- dmaapTopicWriter = dmaapTopicWriters.remove(topic);
- }
-
- dmaapTopicWriter.shutdown();
- }
-
- @Override
- public void destroy() {
- List<DmaapTopicSink> writers = this.inventory();
- for (DmaapTopicSink writer: writers) {
- writer.shutdown();
- }
-
- synchronized(this) {
- this.dmaapTopicWriters.clear();
- }
- }
-
- @Override
- public DmaapTopicSink get(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized(this) {
- if (dmaapTopicWriters.containsKey(topic)) {
- return dmaapTopicWriters.get(topic);
- } else {
- throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
- }
- }
- }
-
- @Override
- public synchronized List<DmaapTopicSink> inventory() {
- return new ArrayList<>(this.dmaapTopicWriters.values());
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("IndexedDmaapTopicSinkFactory []");
- return builder.toString();
- }
-
-} \ No newline at end of file
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSource.java
deleted file mode 100644
index 8d9329fa..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSource.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus;
-
-public interface DmaapTopicSource extends BusTopicSource {
-
- /**
- * factory for managing and tracking DMAAP sources
- */
- public static DmaapTopicSourceFactory factory = new IndexedDmaapTopicSourceFactory();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java
deleted file mode 100644
index 5a8e2a72..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java
+++ /dev/null
@@ -1,587 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.onap.policy.drools.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-import org.onap.policy.drools.properties.PolicyProperties;
-
-/**
- * DMAAP Topic Source Factory
- */
-public interface DmaapTopicSourceFactory {
- public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
- public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
- public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
- public final String DME2_VERSION_PROPERTY = "Version";
- public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
- public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
- public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
- public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
-
- /**
- * Creates an DMAAP Topic Source based on properties files
- *
- * @param properties Properties containing initialization values
- *
- * @return an DMAAP Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public List<DmaapTopicSource> build(Properties properties);
-
- /**
- * Instantiates a new DMAAP Topic Source
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param userName user name
- * @param password password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Read Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @param managed is this endpoind managed?
- * @param useHttps does the connection use HTTPS?
- * @param allowSelfSignedCerts does connection allow self-signed certificates?
- *
- * @return an DMAAP Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSource build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- String userName,
- String password,
- String consumerGroup,
- String consumerInstance,
- int fetchTimeout,
- int fetchLimit,
- boolean managed,
- boolean useHttps,
- boolean allowSelfSignedCerts);
-
- /**
- * Instantiates a new DMAAP Topic Source
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param userName user name
- * @param password password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Read Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @param environment DME2 environment
- * @param aftEnvironment DME2 AFT environment
- * @param partner DME2 Partner
- * @param latitude DME2 latitude
- * @param longitude DME2 longitude
- * @param additionalProps additional properties to pass to DME2
- * @param managed is this endpoind managed?
- * @param useHttps does the connection use HTTPS?
- * @param allowSelfSignedCerts does connection allow self-signed certificates?
- *
- * @return an DMAAP Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSource build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- String userName,
- String password,
- String consumerGroup,
- String consumerInstance,
- int fetchTimeout,
- int fetchLimit,
- String environment,
- String aftEnvironment,
- String partner,
- String latitude,
- String longitude,
- Map<String,String> additionalProps,
- boolean managed,
- boolean useHttps,
- boolean allowSelfSignedCerts);
-
- /**
- * Instantiates a new DMAAP Topic Source
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- *
- * @return an DMAAP Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSource build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret);
-
- /**
- * Instantiates a new DMAAP Topic Source
- *
- * @param servers list of servers
- * @param topic topic name
- *
- * @return an DMAAP Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSource build(List<String> servers,
- String topic);
-
- /**
- * Destroys an DMAAP Topic Source based on a topic
- *
- * @param topic topic name
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public void destroy(String topic);
-
- /**
- * Destroys all DMAAP Topic Sources
- */
- public void destroy();
-
- /**
- * gets an DMAAP Topic Source based on topic name
- * @param topic the topic name
- * @return an DMAAP Topic Source with topic name
- * @throws IllegalArgumentException if an invalid topic is provided
- * @throws IllegalStateException if the DMAAP Topic Source is
- * an incorrect state
- */
- public DmaapTopicSource get(String topic);
-
- /**
- * Provides a snapshot of the DMAAP Topic Sources
- * @return a list of the DMAAP Topic Sources
- */
- public List<DmaapTopicSource> inventory();
-}
-
-
-/* ------------- implementation ----------------- */
-
-/**
- * Factory of DMAAP Source Topics indexed by topic name
- */
-
-class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
- private static final String MISSING_TOPIC = "A topic must be provided";
-
- /**
- * Logger
- */
- private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
-
- /**
- * DMaaP Topic Name Index
- */
- protected HashMap<String, DmaapTopicSource> dmaapTopicSources =
- new HashMap<>();
-
- /**
- * {@inheritDoc}
- */
- @Override
- public DmaapTopicSource build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- String userName,
- String password,
- String consumerGroup,
- String consumerInstance,
- int fetchTimeout,
- int fetchLimit,
- String environment,
- String aftEnvironment,
- String partner,
- String latitude,
- String longitude,
- Map<String,String> additionalProps,
- boolean managed,
- boolean useHttps,
- boolean allowSelfSignedCerts) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized(this) {
- if (dmaapTopicSources.containsKey(topic)) {
- return dmaapTopicSources.get(topic);
- }
-
- DmaapTopicSource dmaapTopicSource =
- new SingleThreadedDmaapTopicSource(servers, topic,
- apiKey, apiSecret, userName, password,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit,
- environment, aftEnvironment, partner,
- latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
-
- if (managed)
- dmaapTopicSources.put(topic, dmaapTopicSource);
-
- return dmaapTopicSource;
- }
- }
- /**
- * {@inheritDoc}
- */
- @Override
- public DmaapTopicSource build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- String userName,
- String password,
- String consumerGroup,
- String consumerInstance,
- int fetchTimeout,
- int fetchLimit,
- boolean managed,
- boolean useHttps,
- boolean allowSelfSignedCerts) {
-
- if (servers == null || servers.isEmpty()) {
- throw new IllegalArgumentException("DMaaP Server(s) must be provided");
- }
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized(this) {
- if (dmaapTopicSources.containsKey(topic)) {
- return dmaapTopicSources.get(topic);
- }
-
- DmaapTopicSource dmaapTopicSource =
- new SingleThreadedDmaapTopicSource(servers, topic,
- apiKey, apiSecret, userName, password,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
-
- if (managed)
- dmaapTopicSources.put(topic, dmaapTopicSource);
-
- return dmaapTopicSource;
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<DmaapTopicSource> build(Properties properties) {
-
- String readTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
- if (readTopics == null || readTopics.isEmpty()) {
- logger.info("{}: no topic for DMaaP Source", this);
- return new ArrayList<>();
- }
- List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
-
- List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
- synchronized(this) {
- for (String topic: readTopicList) {
- if (this.dmaapTopicSources.containsKey(topic)) {
- dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
- continue;
- }
-
- String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." +
- topic +
- PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
-
- List<String> serverList;
- if (servers != null && !servers.isEmpty())
- serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
- else serverList = new ArrayList<>();
-
- String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
-
- String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
-
- String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
-
- String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
-
- String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
-
- String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
-
- String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
-
- /* DME2 Properties */
-
- String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
-
- String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
-
- String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
-
- String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
-
- String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
-
- String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
-
- String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
-
- String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
-
- String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
-
- String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
-
- String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
-
- String dme2SessionStickinessRequired = properties
- .getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
-
- Map<String,String> dme2AdditionalProps = new HashMap<>();
-
- if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty())
- dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
- if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty())
- dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
- if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty())
- dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
- if (dme2Version != null && !dme2Version.isEmpty())
- dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
- if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty())
- dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty())
- dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
- if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty())
- dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
-
-
- if (servers == null || servers.isEmpty()) {
-
- logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
- continue;
- }
-
- int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
- if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
- try {
- fetchTimeout = Integer.parseInt(fetchTimeoutString);
- } catch (NumberFormatException nfe) {
- logger.warn("{}: fetch timeout {} is in invalid format for topic {} ",
- this, fetchTimeoutString, topic);
- }
- }
-
- String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
- int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
- if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
- try {
- fetchLimit = Integer.parseInt(fetchLimitString);
- } catch (NumberFormatException nfe) {
- logger.warn("{}: fetch limit {} is in invalid format for topic {} ",
- this, fetchLimitString, topic);
- }
- }
-
- String managedString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_MANAGED_SUFFIX);
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
-
- String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic +
- PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
-
- //default is to use HTTP if no https property exists
- boolean useHttps = false;
- if (useHttpsString != null && !useHttpsString.isEmpty()){
- useHttps = Boolean.parseBoolean(useHttpsString);
- }
-
- String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic +
- PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
-
- //default is to disallow self-signed certs
- boolean allowSelfSignedCerts = false;
- if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
- allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
- }
-
-
- DmaapTopicSource uebTopicSource = this.build(serverList, topic,
- apiKey, apiSecret, aafMechId, aafPassword,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit,
- dme2Environment, dme2AftEnvironment, dme2Partner,
- dme2Latitude, dme2Longitude, dme2AdditionalProps,
- managed, useHttps, allowSelfSignedCerts);
-
- dmaapTopicSourceLst.add(uebTopicSource);
- }
- }
- return dmaapTopicSourceLst;
- }
-
- /**
- * {@inheritDoc}
- * @throws IllegalArgumentException
- */
- @Override
- public DmaapTopicSource build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret) {
- return this.build(servers, topic,
- apiKey, apiSecret, null, null,
- null, null,
- DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
- DmaapTopicSource.DEFAULT_LIMIT_FETCH,
- true,
- false,
- false);
- }
-
- /**
- * {@inheritDoc}
- * @throws IllegalArgumentException
- */
- @Override
- public DmaapTopicSource build(List<String> servers, String topic) {
- return this.build(servers, topic, null, null);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void destroy(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- DmaapTopicSource uebTopicSource;
-
- synchronized(this) {
- if (!dmaapTopicSources.containsKey(topic)) {
- return;
- }
-
- uebTopicSource = dmaapTopicSources.remove(topic);
- }
-
- uebTopicSource.shutdown();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public DmaapTopicSource get(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized(this) {
- if (dmaapTopicSources.containsKey(topic)) {
- return dmaapTopicSources.get(topic);
- } else {
- throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
- }
- }
- }
-
- @Override
- public synchronized List<DmaapTopicSource> inventory() {
- return new ArrayList<>(this.dmaapTopicSources.values());
- }
-
- @Override
- public void destroy() {
- List<DmaapTopicSource> readers = this.inventory();
- for (DmaapTopicSource reader: readers) {
- reader.shutdown();
- }
-
- synchronized(this) {
- this.dmaapTopicSources.clear();
- }
- }
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("IndexedDmaapTopicSourceFactory []");
- return builder.toString();
- }
-
-}
-
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSink.java
deleted file mode 100644
index afc11229..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSink.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.policy.drools.event.comm.bus;
-
-import java.util.List;
-
-import org.onap.policy.drools.event.comm.TopicSink;
-import org.onap.policy.drools.event.comm.bus.internal.TopicBase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * NOOP topic sink
- */
-public class NoopTopicSink extends TopicBase implements TopicSink {
-
- /**
- * factory
- */
- public static final NoopTopicSinkFactory factory = new IndexedNoopTopicSinkFactory();
-
- /**
- * logger
- */
- private static Logger logger = LoggerFactory.getLogger(NoopTopicSink.class);
-
- /**
- * net logger
- */
- private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
-
- /**
- * constructor
- * @param servers servers
- * @param topic topic
- * @throws IllegalArgumentException if an invalid argument has been passed in
- */
- public NoopTopicSink(List<String> servers, String topic) {
- super(servers, topic);
- }
-
- @Override
- public boolean send(String message) {
-
- if (message == null || message.isEmpty())
- throw new IllegalArgumentException("Message to send is empty");
-
- if (!this.alive)
- throw new IllegalStateException(this + " is stopped");
-
- try {
- synchronized (this) {
- this.recentEvents.add(message);
- }
-
- netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(),
- this.topic, System.lineSeparator(), message);
-
- broadcast(message);
- } catch (Exception e) {
- logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
- return false;
- }
-
- return true;
- }
-
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return CommInfrastructure.NOOP;
- }
-
- @Override
- public boolean start() {
- logger.info("{}: starting", this);
-
- synchronized(this) {
-
- if (this.alive)
- return true;
-
- if (locked)
- throw new IllegalStateException(this + " is locked.");
-
- this.alive = true;
- }
-
- return true;
- }
-
- @Override
- public boolean stop() {
- synchronized(this) {
- this.alive = false;
- }
- return true;
- }
-
- @Override
- public void shutdown() {
- this.stop();
- }
-
- @Override
- public String toString() {
- return "NoopTopicSink [toString()=" + super.toString() + "]";
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java
deleted file mode 100644
index 8633d093..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.policy.drools.event.comm.bus;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-
-import org.onap.policy.drools.properties.PolicyProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Noop Topic Sink Factory
- */
-public interface NoopTopicSinkFactory {
-
- /**
- * Creates noop topic sinks based on properties files
- *
- * @param properties Properties containing initialization values
- *
- * @return a noop topic sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public List<NoopTopicSink> build(Properties properties);
-
- /**
- * builds a noop sink
- *
- * @param servers list of servers
- * @param topic topic name
- * @param managed is this sink endpoint managed?
- * @return a noop topic sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public NoopTopicSink build(List<String> servers, String topic, boolean managed);
-
- /**
- * Destroys a sink based on the topic
- *
- * @param topic topic name
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public void destroy(String topic);
-
- /**
- * gets a sink based on topic name
- *
- * @param topic the topic name
- *
- * @return a sink with topic name
- * @throws IllegalArgumentException if an invalid topic is provided
- * @throws IllegalStateException if the sink is in an incorrect state
- */
- public NoopTopicSink get(String topic);
-
- /**
- * Provides a snapshot of the UEB Topic Writers
- *
- * @return a list of the UEB Topic Writers
- */
- public List<NoopTopicSink> inventory();
-
- /**
- * Destroys all sinks
- */
- public void destroy();
-}
-
-
-/* ------------- implementation ----------------- */
-
-/**
- * Factory of noop sinks
- */
-class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory {
- private static final String MISSING_TOPIC = "A topic must be provided";
-
-/**
- * Logger
- */
- private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
-
- /**
- * noop topic sinks map
- */
- protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>();
-
- @Override
- public List<NoopTopicSink> build(Properties properties) {
-
- final String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS);
- if (sinkTopics == null || sinkTopics.isEmpty()) {
- logger.info("{}: no topic for noop sink", this);
- return new ArrayList<>();
- }
-
- final List<String> sinkTopicList =
- new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*")));
- final List<NoopTopicSink> newSinks = new ArrayList<>();
- synchronized (this) {
- for (final String topic : sinkTopicList) {
- if (this.noopTopicSinks.containsKey(topic)) {
- newSinks.add(this.noopTopicSinks.get(topic));
- continue;
- }
-
- String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "."
- + topic + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
-
- if (servers == null || servers.isEmpty())
- servers = "noop";
-
- final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
-
- final String managedString =
- properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_MANAGED_SUFFIX);
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
-
- final NoopTopicSink noopSink = this.build(serverList, topic, managed);
- newSinks.add(noopSink);
- }
- return newSinks;
- }
- }
-
- @Override
- public NoopTopicSink build(List<String> servers, String topic, boolean managed) {
-
- List<String> noopSinkServers = servers;
- if (noopSinkServers == null) {
- noopSinkServers = new ArrayList<>();
- }
-
- if (noopSinkServers.isEmpty()) {
- noopSinkServers.add("noop");
- }
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (this.noopTopicSinks.containsKey(topic)) {
- return this.noopTopicSinks.get(topic);
- }
-
- final NoopTopicSink sink = new NoopTopicSink(noopSinkServers, topic);
-
- if (managed)
- this.noopTopicSinks.put(topic, sink);
-
- return sink;
- }
- }
-
- @Override
- public void destroy(String topic) {
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- NoopTopicSink noopSink;
- synchronized (this) {
- if (!this.noopTopicSinks.containsKey(topic)) {
- return;
- }
-
- noopSink = this.noopTopicSinks.remove(topic);
- }
-
- noopSink.shutdown();
- }
-
- @Override
- public void destroy() {
- final List<NoopTopicSink> sinks = this.inventory();
- for (final NoopTopicSink sink : sinks) {
- sink.shutdown();
- }
-
- synchronized (this) {
- this.noopTopicSinks.clear();
- }
- }
-
- @Override
- public NoopTopicSink get(String topic) {
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (this.noopTopicSinks.containsKey(topic)) {
- return this.noopTopicSinks.get(topic);
- } else {
- throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
- }
- }
- }
-
- @Override
- public List<NoopTopicSink> inventory() {
- return new ArrayList<>(this.noopTopicSinks.values());
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSink.java
deleted file mode 100644
index 57dd1f1a..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSink.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus;
-
-/**
- * Topic Writer over UEB Infrastructure
- */
-public interface UebTopicSink extends BusTopicSink {
-
- /**
- * Factory of UEB Topic Sinks for instantiation and management purposes
- */
- public static final UebTopicSinkFactory factory = new IndexedUebTopicSinkFactory();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSinkFactory.java
deleted file mode 100644
index 10468bef..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSinkFactory.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-
-import org.onap.policy.drools.event.comm.bus.internal.InlineUebTopicSink;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-import org.onap.policy.drools.properties.PolicyProperties;
-
-/**
- * UEB Topic Sink Factory
- */
-public interface UebTopicSinkFactory {
-
- /**
- * Instantiates a new UEB Topic Writer
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param partitionKey Consumer Group
- * @param managed is this sink endpoint managed?
- *
- * @return an UEB Topic Sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public UebTopicSink build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- String partitionKey,
- boolean managed,
- boolean useHttps,
- boolean allowSelfSignedCerts);
-
- /**
- * Creates an UEB Topic Writer based on properties files
- *
- * @param properties Properties containing initialization values
- *
- * @return an UEB Topic Writer
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public List<UebTopicSink> build(Properties properties);
-
- /**
- * Instantiates a new UEB Topic Writer
- *
- * @param servers list of servers
- * @param topic topic name
- *
- * @return an UEB Topic Writer
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public UebTopicSink build(List<String> servers, String topic);
-
- /**
- * Destroys an UEB Topic Writer based on a topic
- *
- * @param topic topic name
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public void destroy(String topic);
-
- /**
- * gets an UEB Topic Writer based on topic name
- * @param topic the topic name
- *
- * @return an UEB Topic Writer with topic name
- * @throws IllegalArgumentException if an invalid topic is provided
- * @throws IllegalStateException if the UEB Topic Reader is
- * an incorrect state
- */
- public UebTopicSink get(String topic);
-
- /**
- * Provides a snapshot of the UEB Topic Writers
- * @return a list of the UEB Topic Writers
- */
- public List<UebTopicSink> inventory();
-
- /**
- * Destroys all UEB Topic Writers
- */
- public void destroy();
-}
-
-/* ------------- implementation ----------------- */
-
-/**
- * Factory of UEB Reader Topics indexed by topic name
- */
-class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
- private static final String MISSING_TOPIC = "A topic must be provided";
-
- /**
- * Logger
- */
- private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
-
- /**
- * UEB Topic Name Index
- */
- protected HashMap<String, UebTopicSink> uebTopicSinks =
- new HashMap<>();
-
- @Override
- public UebTopicSink build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- String partitionKey,
- boolean managed,
- boolean useHttps,
- boolean allowSelfSignedCerts) {
-
- if (servers == null || servers.isEmpty()) {
- throw new IllegalArgumentException("UEB Server(s) must be provided");
- }
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (uebTopicSinks.containsKey(topic)) {
- return uebTopicSinks.get(topic);
- }
-
- UebTopicSink uebTopicWriter =
- new InlineUebTopicSink(servers, topic,
- apiKey, apiSecret,partitionKey, useHttps, allowSelfSignedCerts);
-
- if (managed)
- uebTopicSinks.put(topic, uebTopicWriter);
-
- return uebTopicWriter;
- }
- }
-
-
- @Override
- public UebTopicSink build(List<String> servers, String topic) {
- return this.build(servers, topic, null, null, null, true, false, false);
- }
-
-
- @Override
- public List<UebTopicSink> build(Properties properties) {
-
- String writeTopics = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS);
- if (writeTopics == null || writeTopics.isEmpty()) {
- logger.info("{}: no topic for UEB Sink", this);
- return new ArrayList<>();
- }
-
- List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
- List<UebTopicSink> newUebTopicSinks = new ArrayList<>();
- synchronized(this) {
- for (String topic: writeTopicList) {
- if (this.uebTopicSinks.containsKey(topic)) {
- newUebTopicSinks.add(this.uebTopicSinks.get(topic));
- continue;
- }
-
- String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." +
- topic +
- PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
- if (servers == null || servers.isEmpty()) {
- logger.error("{}: no UEB servers configured for sink {}", this, topic);
- continue;
- }
-
- List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
-
- String apiKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
- String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
- String partitionKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
-
- String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
- PolicyProperties.PROPERTY_MANAGED_SUFFIX);
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
-
- String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
- PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
-
- //default is to use HTTP if no https property exists
- boolean useHttps = false;
- if (useHttpsString != null && !useHttpsString.isEmpty()){
- useHttps = Boolean.parseBoolean(useHttpsString);
- }
-
-
- String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
- PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
-
- //default is to disallow self-signed certs
- boolean allowSelfSignedCerts = false;
- if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
- allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
- }
-
- UebTopicSink uebTopicWriter = this.build(serverList, topic,
- apiKey, apiSecret,
- partitionKey, managed, useHttps, allowSelfSignedCerts);
- newUebTopicSinks.add(uebTopicWriter);
- }
- return newUebTopicSinks;
- }
- }
-
- @Override
- public void destroy(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- UebTopicSink uebTopicWriter;
- synchronized(this) {
- if (!uebTopicSinks.containsKey(topic)) {
- return;
- }
-
- uebTopicWriter = uebTopicSinks.remove(topic);
- }
-
- uebTopicWriter.shutdown();
- }
-
- @Override
- public void destroy() {
- List<UebTopicSink> writers = this.inventory();
- for (UebTopicSink writer: writers) {
- writer.shutdown();
- }
-
- synchronized(this) {
- this.uebTopicSinks.clear();
- }
- }
-
- @Override
- public UebTopicSink get(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized(this) {
- if (uebTopicSinks.containsKey(topic)) {
- return uebTopicSinks.get(topic);
- } else {
- throw new IllegalStateException("UebTopicSink for " + topic + " not found");
- }
- }
- }
-
- @Override
- public synchronized List<UebTopicSink> inventory() {
- return new ArrayList<>(this.uebTopicSinks.values());
- }
-
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("IndexedUebTopicSinkFactory []");
- return builder.toString();
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSource.java
deleted file mode 100644
index 7d35a993..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSource.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus;
-
-/**
- * Topic Source for UEB Communication Infrastructure
- *
- */
-public interface UebTopicSource extends BusTopicSource {
-
- /**
- * factory for managing and tracking UEB readers
- */
- public static UebTopicSourceFactory factory =
- new IndexedUebTopicSourceFactory();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSourceFactory.java
deleted file mode 100644
index d48be278..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/UebTopicSourceFactory.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-
-import org.onap.policy.drools.event.comm.bus.internal.SingleThreadedUebTopicSource;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-import org.onap.policy.drools.properties.PolicyProperties;
-
-/**
- * UEB Topic Source Factory
- */
-public interface UebTopicSourceFactory {
-
- /**
- * Creates an UEB Topic Source based on properties files
- *
- * @param properties Properties containing initialization values
- *
- * @return an UEB Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public List<UebTopicSource> build(Properties properties);
-
- /**
- * Instantiates a new UEB Topic Source
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Read Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @param managed is this source endpoint managed?
- *
- * @return an UEB Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public UebTopicSource build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- String consumerGroup,
- String consumerInstance,
- int fetchTimeout,
- int fetchLimit,
- boolean managed,
- boolean useHttps,
- boolean allowSelfSignedCerts);
-
- /**
- * Instantiates a new UEB Topic Source
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- *
- * @return an UEB Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public UebTopicSource build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret);
-
- /**
- * Instantiates a new UEB Topic Source
- *
- * @param servers list of servers
- * @param topic topic name
- *
- * @return an UEB Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public UebTopicSource build(List<String> servers,
- String topic);
-
- /**
- * Destroys an UEB Topic Source based on a topic
- *
- * @param topic topic name
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public void destroy(String topic);
-
- /**
- * Destroys all UEB Topic Sources
- */
- public void destroy();
-
- /**
- * gets an UEB Topic Source based on topic name
- * @param topic the topic name
- * @return an UEB Topic Source with topic name
- * @throws IllegalArgumentException if an invalid topic is provided
- * @throws IllegalStateException if the UEB Topic Source is
- * an incorrect state
- */
- public UebTopicSource get(String topic);
-
- /**
- * Provides a snapshot of the UEB Topic Sources
- * @return a list of the UEB Topic Sources
- */
- public List<UebTopicSource> inventory();
-}
-
-/* ------------- implementation ----------------- */
-
-/**
- * Factory of UEB Source Topics indexed by topic name
- */
-class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
- private static final String MISSING_TOPIC = "A topic must be provided";
-
- /**
- * Logger
- */
- private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
-
- /**
- * UEB Topic Name Index
- */
- protected HashMap<String, UebTopicSource> uebTopicSources =
- new HashMap<>();
-
- /**
- * {@inheritDoc}
- */
- @Override
- public UebTopicSource build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- String consumerGroup,
- String consumerInstance,
- int fetchTimeout,
- int fetchLimit,
- boolean managed,
- boolean useHttps,
- boolean allowSelfSignedCerts) {
- if (servers == null || servers.isEmpty()) {
- throw new IllegalArgumentException("UEB Server(s) must be provided");
- }
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized(this) {
- if (uebTopicSources.containsKey(topic)) {
- return uebTopicSources.get(topic);
- }
-
- UebTopicSource uebTopicSource =
- new SingleThreadedUebTopicSource(servers, topic,
- apiKey, apiSecret,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
-
- if (managed)
- uebTopicSources.put(topic, uebTopicSource);
-
- return uebTopicSource;
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<UebTopicSource> build(Properties properties) {
-
- String readTopics = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
- if (readTopics == null || readTopics.isEmpty()) {
- logger.info("{}: no topic for UEB Source", this);
- return new ArrayList<>();
- }
- List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
-
- List<UebTopicSource> newUebTopicSources = new ArrayList<>();
- synchronized(this) {
- for (String topic: readTopicList) {
- if (this.uebTopicSources.containsKey(topic)) {
- newUebTopicSources.add(this.uebTopicSources.get(topic));
- continue;
- }
-
- String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." +
- topic +
- PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
-
- if (servers == null || servers.isEmpty()) {
- logger.error("{}: no UEB servers configured for sink {}", this, topic);
- continue;
- }
-
- List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
-
- String apiKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
-
- String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
-
- String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
-
- String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
-
- String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
- int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
- if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
- try {
- fetchTimeout = Integer.parseInt(fetchTimeoutString);
- } catch (NumberFormatException nfe) {
- logger.warn("{}: fetch timeout {} is in invalid format for topic {} ",
- this, fetchTimeoutString, topic);
- }
- }
-
- String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS +
- "." + topic +
- PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
- int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
- if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
- try {
- fetchLimit = Integer.parseInt(fetchLimitString);
- } catch (NumberFormatException nfe) {
- logger.warn("{}: fetch limit {} is in invalid format for topic {} ",
- this, fetchLimitString, topic);
- }
- }
-
- String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." +
- topic + PolicyProperties.PROPERTY_MANAGED_SUFFIX);
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
-
- String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
- PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
-
- //default is to use HTTP if no https property exists
- boolean useHttps = false;
- if (useHttpsString != null && !useHttpsString.isEmpty()){
- useHttps = Boolean.parseBoolean(useHttpsString);
- }
-
- String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
- PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
-
- //default is to disallow self-signed certs
- boolean allowSelfSignedCerts = false;
- if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
- allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
- }
-
- UebTopicSource uebTopicSource = this.build(serverList, topic,
- apiKey, apiSecret,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts);
- newUebTopicSources.add(uebTopicSource);
- }
- }
- return newUebTopicSources;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public UebTopicSource build(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret) {
-
- return this.build(servers, topic,
- apiKey, apiSecret,
- null, null,
- UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
- UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public UebTopicSource build(List<String> servers, String topic) {
- return this.build(servers, topic, null, null);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void destroy(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- UebTopicSource uebTopicSource;
-
- synchronized(this) {
- if (!uebTopicSources.containsKey(topic)) {
- return;
- }
-
- uebTopicSource = uebTopicSources.remove(topic);
- }
-
- uebTopicSource.shutdown();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public UebTopicSource get(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized(this) {
- if (uebTopicSources.containsKey(topic)) {
- return uebTopicSources.get(topic);
- } else {
- throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
- }
- }
- }
-
- @Override
- public synchronized List<UebTopicSource> inventory() {
- return new ArrayList<>(this.uebTopicSources.values());
- }
-
- @Override
- public void destroy() {
- List<UebTopicSource> readers = this.inventory();
- for (UebTopicSource reader: readers) {
- reader.shutdown();
- }
-
- synchronized(this) {
- this.uebTopicSources.clear();
- }
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("IndexedUebTopicSourceFactory []");
- return builder.toString();
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
deleted file mode 100644
index 828bb920..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
+++ /dev/null
@@ -1,546 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus.internal;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import org.onap.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
-import org.onap.policy.drools.properties.PolicyProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.impl.MRConsumerImpl;
-import com.att.nsa.mr.client.response.MRConsumerResponse;
-import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-
-/**
- * Wrapper around libraries to consume from message bus
- *
- */
-public interface BusConsumer {
-
- /**
- * fetch messages
- *
- * @return list of messages
- * @throws Exception when error encountered by underlying libraries
- */
- public Iterable<String> fetch() throws InterruptedException, IOException;
-
- /**
- * close underlying library consumer
- */
- public void close();
-
- /**
- * BusConsumer that supports server-side filtering.
- */
- public interface FilterableBusConsumer extends BusConsumer {
-
- /**
- * Sets the server-side filter.
- *
- * @param filter new filter value, or {@code null}
- * @throws IllegalArgumentException if the consumer cannot be built with
- * the new filter
- */
- public void setFilter(String filter);
- }
-
- /**
- * Cambria based consumer
- */
- public static class CambriaConsumerWrapper implements FilterableBusConsumer {
-
- /**
- * logger
- */
- private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
-
- /**
- * Used to build the consumer.
- */
- private final ConsumerBuilder builder;
-
- /**
- * Locked while updating {@link #consumer} and {@link #newConsumer}.
- */
- private final Object consLocker = new Object();
-
- /**
- * Cambria client
- */
- private CambriaConsumer consumer;
-
- /**
- * Cambria client to use for next fetch
- */
- private CambriaConsumer newConsumer = null;
-
- /**
- * fetch timeout
- */
- protected int fetchTimeout;
-
- /**
- * close condition
- */
- protected Object closeCondition = new Object();
-
- /**
- * Cambria Consumer Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @throws GeneralSecurityException
- * @throws MalformedURLException
- */
- public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey,
- String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
- this(servers, topic, apiKey, apiSecret, null, null,
- consumerGroup, consumerInstance, fetchTimeout, fetchLimit,
- useHttps, useSelfSignedCerts);
- }
-
- public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey,
- String apiSecret, String username, String password,
- String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
-
- this.fetchTimeout = fetchTimeout;
-
- this.builder = new CambriaClientBuilders.ConsumerBuilder();
-
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
-
- // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
- builder.withSocketTimeout(fetchTimeout + 30000);
-
- if (useHttps) {
- builder.usingHttps();
-
- if (useSelfSignedCerts) {
- builder.allowSelfSignedCertificates();
- }
- }
-
- if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
- builder.authenticatedBy(apiKey, apiSecret);
- }
-
- if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
- builder.authenticatedByHttp(username, password);
- }
-
- try {
- this.consumer = builder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public Iterable<String> fetch() throws IOException, InterruptedException {
- try {
- return getCurrentConsumer().fetch();
- } catch (final IOException e) {
- logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
- this.fetchTimeout);
- synchronized (this.closeCondition) {
- this.closeCondition.wait(this.fetchTimeout);
- }
-
- throw e;
- }
- }
-
- @Override
- public void close() {
- synchronized (closeCondition) {
- closeCondition.notifyAll();
- }
-
- getCurrentConsumer().close();
- }
-
- private CambriaConsumer getCurrentConsumer() {
- CambriaConsumer old = null;
- CambriaConsumer ret;
-
- synchronized(consLocker) {
- if(this.newConsumer != null) {
- // replace old consumer with new consumer
- old = this.consumer;
- this.consumer = this.newConsumer;
- this.newConsumer = null;
- }
-
- ret = this.consumer;
- }
-
- if(old != null) {
- old.close();
- }
-
- return ret;
- }
-
- @Override
- public void setFilter(String filter) {
- logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
- builder.withServerSideFilter(filter);
-
- try {
- CambriaConsumer previous;
- synchronized(consLocker) {
- previous = this.newConsumer;
- this.newConsumer = builder.build();
- }
-
- if(previous != null) {
- // there was already a new consumer - close it
- previous.close();
- }
-
- } catch (MalformedURLException | GeneralSecurityException e) {
- /*
- * Since an exception occurred, "consumer" still has its old value,
- * thus it should not be closed at this point.
- */
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public String toString() {
- return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
- }
- }
-
- /**
- * MR based consumer
- */
- public abstract class DmaapConsumerWrapper implements BusConsumer {
-
- /**
- * logger
- */
- private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
-
- /**
- * Name of the "protocol" property.
- */
- protected static final String PROTOCOL_PROP = "Protocol";
-
- /**
- * fetch timeout
- */
- protected int fetchTimeout;
-
- /**
- * close condition
- */
- protected Object closeCondition = new Object();
-
- /**
- * MR Consumer
- */
- protected MRConsumerImpl consumer;
-
- /**
- * MR Consumer Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param username AAF Login
- * @param password AAF Password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @throws MalformedURLException
- */
- public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- String username, String password, String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit) throws MalformedURLException {
-
- this.fetchTimeout = fetchTimeout;
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException("No topic for DMaaP");
- }
-
- this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, null, apiKey, apiSecret);
-
- this.consumer.setUsername(username);
- this.consumer.setPassword(password);
- }
-
- @Override
- public Iterable<String> fetch() throws InterruptedException, IOException {
- final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
- if (response == null) {
- logger.warn("{}: DMaaP NULL response received", this);
-
- synchronized (closeCondition) {
- closeCondition.wait(fetchTimeout);
- }
- return new ArrayList<>();
- } else {
- logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(),
- response.getResponseMessage());
-
- if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) {
-
- logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
- response.getResponseMessage());
-
- synchronized (closeCondition) {
- closeCondition.wait(fetchTimeout);
- }
-
- /* fall through */
- }
- }
-
- if (response.getActualMessages() == null)
- return new ArrayList<>();
- else
- return response.getActualMessages();
- }
-
- @Override
- public void close() {
- synchronized (closeCondition) {
- closeCondition.notifyAll();
- }
-
- this.consumer.close();
- }
-
- @Override
- public String toString() {
- return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()="
- + consumer.getHost() + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag()
- + ", consumer.getUsername()=" + consumer.getUsername() + "]";
- }
- }
-
- /**
- * MR based consumer
- */
- public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
-
- private final Properties props;
-
- /**
- * MR Consumer Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param aafLogin AAF Login
- * @param aafPassword AAF Password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @throws MalformedURLException
- */
- public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey,
- String apiSecret, String aafLogin, String aafPassword, String consumerGroup,
- String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps)
- throws MalformedURLException {
-
- super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup,
- consumerInstance, fetchTimeout, fetchLimit);
-
- // super constructor sets servers = {""} if empty to avoid errors when using DME2
- if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null)
- || (servers.isEmpty())) {
- throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
- }
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
-
- props = new Properties();
-
- if (useHttps) {
- props.setProperty(PROTOCOL_PROP, "https");
- this.consumer.setHost(servers.get(0) + ":3905");
-
- } else {
- props.setProperty(PROTOCOL_PROP, "http");
- this.consumer.setHost(servers.get(0) + ":3904");
- }
-
- this.consumer.setProps(props);
- logger.info("{}: CREATION", this);
- }
-
- @Override
- public String toString() {
- final MRConsumerImpl consumer = this.consumer;
-
- return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()="
- + consumer.getHost() + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag()
- + ", consumer.getUsername()=" + consumer.getUsername() + "]";
- }
- }
-
- public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
-
- private final Properties props;
-
- public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey,
- String apiSecret, String dme2Login, String dme2Password, String consumerGroup,
- String consumerInstance, int fetchTimeout, int fetchLimit, String environment,
- String aftEnvironment, String dme2Partner, String latitude, String longitude,
- Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException {
-
-
-
- super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup,
- consumerInstance, fetchTimeout, fetchLimit);
-
-
- final String dme2RouteOffer =
- additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
-
- if (environment == null || environment.isEmpty()) {
- throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (aftEnvironment == null || aftEnvironment.isEmpty()) {
- throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (latitude == null || latitude.isEmpty()) {
- throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (longitude == null || longitude.isEmpty()) {
- throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
- }
-
- if ((dme2Partner == null || dme2Partner.isEmpty())
- && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
- throw new IllegalArgumentException(
- "Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
- }
-
- final String serviceName = servers.get(0);
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
-
- this.consumer.setUsername(dme2Login);
- this.consumer.setPassword(dme2Password);
-
- props = new Properties();
-
- props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
-
- props.setProperty("username", dme2Login);
- props.setProperty("password", dme2Password);
-
- /* These are required, no defaults */
- props.setProperty("topic", topic);
-
- props.setProperty("Environment", environment);
- props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
-
- if (dme2Partner != null)
- props.setProperty("Partner", dme2Partner);
- if (dme2RouteOffer != null)
- props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
-
- props.setProperty("Latitude", latitude);
- props.setProperty("Longitude", longitude);
-
- /* These are optional, will default to these values if not set in additionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
-
- /* These should not change */
- props.setProperty("TransportType", "DME2");
- props.setProperty("MethodType", "GET");
-
- if (useHttps) {
- props.setProperty(PROTOCOL_PROP, "https");
-
- } else {
- props.setProperty(PROTOCOL_PROP, "http");
- }
-
- props.setProperty("contenttype", "application/json");
-
- if (additionalProps != null) {
- for (Map.Entry<String, String> entry : additionalProps.entrySet())
- props.put(entry.getKey(), entry.getValue());
- }
-
- MRClientFactory.prop = props;
- this.consumer.setProps(props);
-
- logger.info("{}: CREATION", this);
- }
-
- private IllegalArgumentException parmException(String topic, String propnm) {
- return new IllegalArgumentException(
- "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + propnm + " property for DME2 in DMaaP");
-
- }
- }
-}
-
-
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java
deleted file mode 100644
index 1efaa063..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus.internal;
-
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.onap.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
-import org.onap.policy.drools.properties.PolicyProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
-import com.att.nsa.mr.client.response.MRPublisherResponse;
-import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-public interface BusPublisher {
-
- /**
- * sends a message
- *
- * @param partition id
- * @param message the message
- * @return true if success, false otherwise
- * @throws IllegalArgumentException if no message provided
- */
- public boolean send(String partitionId, String message);
-
- /**
- * closes the publisher
- */
- public void close();
-
- /**
- * Cambria based library publisher
- */
- public static class CambriaPublisherWrapper implements BusPublisher {
-
- private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
-
- /**
- * The actual Cambria publisher
- */
- @JsonIgnore
- protected volatile CambriaBatchingPublisher publisher;
-
- public CambriaPublisherWrapper(List<String> servers, String topic,
- String apiKey,
- String apiSecret, boolean useHttps) {
- this(servers, topic, apiKey, apiSecret, null, null, useHttps);
- }
-
- public CambriaPublisherWrapper(List<String> servers, String topic,
- String apiKey, String apiSecret,
- String username, String password,
- boolean useHttps) {
-
- PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
-
- builder.usingHosts(servers).onTopic(topic);
-
- // Set read timeout to 30 seconds (TBD: this should be configurable)
- builder.withSocketTimeout(30000);
-
- if (useHttps){
- builder.usingHttps();
- }
-
-
- if (apiKey != null && !apiKey.isEmpty() &&
- apiSecret != null && !apiSecret.isEmpty()) {
- builder.authenticatedBy(apiKey, apiSecret);
- }
-
- if (username != null && !username.isEmpty() &&
- password != null && !password.isEmpty()) {
- builder.authenticatedByHttp(username, password);
- }
-
- try {
- this.publisher = builder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean send(String partitionId, String message) {
- if (message == null)
- throw new IllegalArgumentException("No message provided");
-
- try {
- this.publisher.send(partitionId, message);
- } catch (Exception e) {
- logger.warn("{}: SEND of {} cannot be performed because of {}",
- this, message, e.getMessage(), e);
- return false;
- }
- return true;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void close() {
- logger.info("{}: CLOSE", this);
-
- try {
- this.publisher.close();
- } catch (Exception e) {
- logger.warn("{}: CLOSE FAILED because of {}",
- this, e.getMessage(),e);
- }
- }
-
-
- @Override
- public String toString() {
- return "CambriaPublisherWrapper []";
- }
-
- }
-
- /**
- * DmaapClient library wrapper
- */
- public abstract class DmaapPublisherWrapper implements BusPublisher {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
-
- /**
- * MR based Publisher
- */
- protected MRSimplerBatchPublisher publisher;
- protected Properties props;
-
- /**
- * MR Publisher Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param username AAF or DME2 Login
- * @param password AAF or DME2 Password
- */
- public DmaapPublisherWrapper(ProtocolTypeConstants protocol,
- List<String> servers, String topic,
- String username,
- String password, boolean useHttps) {
-
-
- if (topic == null || topic.isEmpty())
- throw new IllegalArgumentException("No topic for DMaaP");
-
-
- if (protocol == ProtocolTypeConstants.AAF_AUTH) {
- if (servers == null || servers.isEmpty())
- throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
-
- ArrayList<String> dmaapServers = new ArrayList<>();
- if(useHttps){
- for (String server: servers) {
- dmaapServers.add(server + ":3905");
- }
-
- }
- else{
- for (String server: servers) {
- dmaapServers.add(server + ":3904");
- }
- }
-
-
- this.publisher =
- new MRSimplerBatchPublisher.Builder().
- againstUrls(dmaapServers).
- onTopic(topic).
- build();
-
- this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
- } else if (protocol == ProtocolTypeConstants.DME2) {
- ArrayList<String> dmaapServers = new ArrayList<>();
- dmaapServers.add("0.0.0.0:3904");
-
- this.publisher =
- new MRSimplerBatchPublisher.Builder().
- againstUrls(dmaapServers).
- onTopic(topic).
- build();
-
- this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
- }
-
- this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
-
- this.publisher.setUsername(username);
- this.publisher.setPassword(password);
-
- props = new Properties();
-
- if (useHttps) {
- props.setProperty("Protocol", "https");
- } else {
- props.setProperty("Protocol", "http");
- }
-
- props.setProperty("contenttype", "application/json");
- props.setProperty("username", username);
- props.setProperty("password", password);
-
- props.setProperty("topic", topic);
-
- this.publisher.setProps(props);
-
- if (protocol == ProtocolTypeConstants.AAF_AUTH)
- this.publisher.setHost(servers.get(0));
-
- logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void close() {
- logger.info("{}: CLOSE", this);
-
- try {
- this.publisher.close(1, TimeUnit.SECONDS);
- } catch (Exception e) {
- logger.warn("{}: CLOSE FAILED because of {}",
- this, e.getMessage(), e);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean send(String partitionId, String message) {
- if (message == null)
- throw new IllegalArgumentException("No message provided");
-
- this.publisher.setPubResponse(new MRPublisherResponse());
- this.publisher.send(partitionId, message);
- MRPublisherResponse response = this.publisher.sendBatchWithResponse();
- if (response != null) {
- logger.debug("DMaaP publisher received {} : {}",
- response.getResponseCode(),
- response.getResponseMessage());
- }
-
- return true;
- }
-
- @Override
- public String toString() {
- return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
- + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
- + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
- + ", publisher.getUsername()=" + publisher.getUsername() + "]";
- }
- }
-
- /**
- * DmaapClient library wrapper
- */
- public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
- /**
- * MR based Publisher
- */
- public DmaapAafPublisherWrapper(List<String> servers, String topic,
- String aafLogin,
- String aafPassword, boolean useHttps) {
-
- super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
- }
- }
-
- public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
- public DmaapDmePublisherWrapper(List<String> servers, String topic,
- String username, String password,
- String environment, String aftEnvironment, String dme2Partner,
- String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) {
-
- super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps);
-
-
-
-
-
-
- String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
-
- if (environment == null || environment.isEmpty()) {
- throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (aftEnvironment == null || aftEnvironment.isEmpty()) {
- throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (latitude == null || latitude.isEmpty()) {
- throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (longitude == null || longitude.isEmpty()) {
- throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
- }
-
- if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
- throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " +
- PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
- }
-
- String serviceName = servers.get(0);
-
- /* These are required, no defaults */
- props.setProperty("Environment", environment);
- props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
-
- props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
-
- if (dme2Partner != null)
- props.setProperty("Partner", dme2Partner);
- if (dme2RouteOffer != null)
- props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
-
- props.setProperty("Latitude", latitude);
- props.setProperty("Longitude", longitude);
-
- // ServiceName also a default, found in additionalProps
-
- /* These are optional, will default to these values if not set in optionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
-
- /* These should not change */
- props.setProperty("TransportType", "DME2");
- props.setProperty("MethodType", "POST");
-
- for (Map.Entry<String,String> entry : additionalProps.entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
-
- if (value != null)
- props.setProperty(key, value);
- }
-
- this.publisher.setProps(props);
- }
-
- private IllegalArgumentException parmException(String topic, String propnm) {
- return new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
- "." + topic + propnm + " property for DME2 in DMaaP");
-
- }
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java
deleted file mode 100644
index 0bf3d445..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus.internal;
-
-import java.util.List;
-
-import org.onap.policy.drools.event.comm.bus.ApiKeyEnabled;
-
-/**
- * Bus Topic Base
- */
-public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled {
-
- /**
- * API Key
- */
- protected String apiKey;
-
- /**
- * API Secret
- */
- protected String apiSecret;
-
- /**
- * Use https
- */
- protected boolean useHttps;
-
- /**
- * allow self signed certificates
- */
- protected boolean allowSelfSignedCerts;
-
- /**
- * Instantiates a new Bus Topic Base
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param useHttps does connection use HTTPS?
- * @param allowSelfSignedCerts are self-signed certificates allow
- *
- * @return a Bus Topic Base
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public BusTopicBase(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- boolean useHttps,
- boolean allowSelfSignedCerts) {
-
- super(servers, topic);
-
- this.apiKey = apiKey;
- this.apiSecret = apiSecret;
- this.useHttps = useHttps;
- this.allowSelfSignedCerts = allowSelfSignedCerts;
- }
-
- @Override
- public String getApiKey() {
- return apiKey;
- }
-
- @Override
- public String getApiSecret() {
- return apiSecret;
- }
-
- /**
- * @return if using https
- */
- public boolean isUseHttps(){
- return useHttps;
- }
-
- /**
- * @return if self signed certificates are allowed
- */
- public boolean isAllowSelfSignedCerts(){
- return allowSelfSignedCerts;
- }
-
- protected boolean anyNullOrEmpty(String... args) {
- for (String arg : args) {
- if (arg == null || arg.isEmpty()) {
- return true;
- }
- }
-
- return false;
- }
-
- protected boolean allNullOrEmpty(String... args) {
- for (String arg : args) {
- if (!(arg == null || arg.isEmpty())) {
- return false;
- }
- }
-
- return true;
- }
-
-
- @Override
- public String toString() {
- return "BusTopicBase [apiKey=" + apiKey + ", apiSecret=" + apiSecret + ", useHttps=" + useHttps
- + ", allowSelfSignedCerts=" + allowSelfSignedCerts + ", toString()=" + super.toString() + "]";
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
deleted file mode 100644
index a50d7b10..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus.internal;
-
-import java.util.List;
-import java.util.UUID;
-
-import org.onap.policy.drools.event.comm.bus.BusTopicSink;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-
-/**
- * Transport Agnostic Bus Topic Sink to carry out the core functionality
- * to interact with a sink regardless if it is UEB or DMaaP.
- *
- */
-public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
-
- /**
- * loggers
- */
- private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
- private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
-
- /**
- * The partition key to publish to
- */
- protected String partitionId;
-
- /**
- * message bus publisher
- */
- protected BusPublisher publisher;
-
- /**
- * constructor for abstract sink
- *
- * @param servers servers
- * @param topic topic
- * @param apiKey api secret
- * @param apiSecret api secret
- * @param partitionId partition id
- * @param useHttps does connection use HTTPS?
- * @param allowSelfSignedCerts are self-signed certificates allow
- * @throws IllegalArgumentException in invalid parameters are passed in
- */
- public InlineBusTopicSink(List<String> servers, String topic,
- String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts) {
-
- super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
-
- if (partitionId == null || partitionId.isEmpty()) {
- this.partitionId = UUID.randomUUID ().toString();
- }
- }
-
- /**
- * Initialize the Bus publisher
- */
- public abstract void init();
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean start() {
- logger.info("{}: starting", this);
-
- synchronized(this) {
-
- if (this.alive)
- return true;
-
- if (locked)
- throw new IllegalStateException(this + " is locked.");
-
- this.alive = true;
- }
-
- this.init();
- return true;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean stop() {
-
- BusPublisher publisherCopy;
- synchronized(this) {
- this.alive = false;
- publisherCopy = this.publisher;
- this.publisher = null;
- }
-
- if (publisherCopy != null) {
- try {
- publisherCopy.close();
- } catch (Exception e) {
- logger.warn("{}: cannot stop publisher because of {}",
- this, e.getMessage(), e);
- }
- } else {
- logger.warn("{}: there is no publisher", this);
- return false;
- }
-
- return true;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean send(String message) {
-
- if (message == null || message.isEmpty()) {
- throw new IllegalArgumentException("Message to send is empty");
- }
-
- if (!this.alive) {
- throw new IllegalStateException(this + " is stopped");
- }
-
- try {
- synchronized (this) {
- this.recentEvents.add(message);
- }
-
- netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(),
- this.topic, System.lineSeparator(), message);
-
- publisher.send(this.partitionId, message);
- broadcast(message);
- } catch (Exception e) {
- logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
- return false;
- }
-
- return true;
- }
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void setPartitionKey(String partitionKey) {
- this.partitionId = partitionKey;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String getPartitionKey() {
- return this.partitionId;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void shutdown() {
- this.stop();
- }
-
- protected boolean anyNullOrEmpty(String... args) {
- for (String arg : args) {
- if (arg == null || arg.isEmpty()) {
- return true;
- }
- }
-
- return false;
- }
-
- protected boolean allNullOrEmpty(String... args) {
- for (String arg : args) {
- if (!(arg == null || arg.isEmpty())) {
- return false;
- }
- }
-
- return true;
- }
-
-
- @Override
- public String toString() {
- return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]";
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
deleted file mode 100644
index 48116e34..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus.internal;
-
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-import org.onap.policy.drools.event.comm.Topic;
-import org.onap.policy.drools.event.comm.bus.DmaapTopicSink;
-
-/**
- * This implementation publishes events for the associated DMAAP topic,
- * inline with the calling thread.
- */
-public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink {
-
- protected static Logger logger =
- LoggerFactory.getLogger(InlineDmaapTopicSink.class);
-
- protected final String userName;
- protected final String password;
-
- protected String environment = null;
- protected String aftEnvironment = null;
- protected String partner = null;
- protected String latitude = null;
- protected String longitude = null;
-
- protected Map<String,String> additionalProps = null;
-
- /**
- *
- * @param servers DMaaP servers
- * @param topic DMaaP Topic to be monitored
- * @param apiKey DMaaP API Key (optional)
- * @param apiSecret DMaaP API Secret (optional)
- * @param consumerGroup DMaaP Reader Consumer Group
- * @param consumerInstance DMaaP Reader Instance
- * @param fetchTimeout DMaaP fetch timeout
- * @param fetchLimit DMaaP fetch limit
- * @param environment DME2 Environment
- * @param aftEnvironment DME2 AFT Environment
- * @param partner DME2 Partner
- * @param latitude DME2 Latitude
- * @param longitude DME2 Longitude
- * @param additionalProps Additional properties to pass to DME2
- * @param useHttps does connection use HTTPS?
- * @param allowSelfSignedCerts are self-signed certificates allow
- *
- * @throws IllegalArgumentException An invalid parameter passed in
- */
- public InlineDmaapTopicSink(List<String> servers, String topic,
- String apiKey, String apiSecret,
- String userName, String password,
- String partitionKey,
- String environment, String aftEnvironment, String partner,
- String latitude, String longitude, Map<String,String> additionalProps,
- boolean useHttps, boolean allowSelfSignedCerts) {
-
- super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
-
- this.userName = userName;
- this.password = password;
-
- this.environment = environment;
- this.aftEnvironment = aftEnvironment;
- this.partner = partner;
-
- this.latitude = latitude;
- this.longitude = longitude;
-
- this.additionalProps = additionalProps;
- }
-
- public InlineDmaapTopicSink(List<String> servers, String topic,
- String apiKey, String apiSecret,
- String userName, String password,
- String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) {
-
- super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
-
- this.userName = userName;
- this.password = password;
- }
-
-
- @Override
- public void init() {
- if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
- this.publisher =
- new BusPublisher.CambriaPublisherWrapper(this.servers,
- this.topic,
- this.apiKey, this.apiSecret,
- this.userName, this.password,
- this.useHttps);
- } else {
- this.publisher =
- new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic,
- this.userName, this.password,
- this.environment, this.aftEnvironment,
- this.partner, this.latitude, this.longitude,
- this.additionalProps, this.useHttps);
- }
-
- logger.info("{}: DMAAP SINK created", this);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.DMAAP;
- }
-
-
- @Override
- public String toString() {
- return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password
- + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()="
- + super.toString() + "]";
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
deleted file mode 100644
index d1218f3f..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus.internal;
-
-import java.util.List;
-
-import org.onap.policy.drools.event.comm.Topic;
-import org.onap.policy.drools.event.comm.bus.UebTopicSink;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-
-/**
- * This implementation publishes events for the associated UEB topic,
- * inline with the calling thread.
- */
-public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink {
-
- /**
- * logger
- */
- private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class);
-
- /**
- * Argument-based UEB Topic Writer instantiation
- *
- * @param servers list of UEB servers available for publishing
- * @param topic the topic to publish to
- * @param apiKey the api key (optional)
- * @param apiSecret the api secret (optional)
- * @param partitionId the partition key (optional, autogenerated if not provided)
- * @param useHttps does connection use HTTPS?
- * @param allowSelfSignedCerts are self-signed certificates allow
- *
- * @throws IllegalArgumentException if invalid arguments are detected
- */
- public InlineUebTopicSink(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- String partitionId,
- boolean useHttps,
- boolean allowSelfSignedCerts) {
- super(servers, topic, apiKey, apiSecret, partitionId, useHttps, allowSelfSignedCerts);
- }
-
- /**
- * Instantiation of internal resources
- */
- @Override
- public void init() {
-
- this.publisher =
- new BusPublisher.CambriaPublisherWrapper(this.servers,
- this.topic,
- this.apiKey,
- this.apiSecret,
- this.useHttps);
- logger.info("{}: UEB SINK created", this);
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("InlineUebTopicSink [getTopicCommInfrastructure()=").append(getTopicCommInfrastructure())
- .append(", toString()=").append(super.toString()).append("]");
- return builder.toString();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.UEB;
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
deleted file mode 100644
index 768046d0..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus.internal;
-
-import java.net.MalformedURLException;
-import java.util.List;
-import java.util.UUID;
-import org.onap.policy.drools.event.comm.FilterableTopicSource;
-import org.onap.policy.drools.event.comm.TopicListener;
-import org.onap.policy.drools.event.comm.bus.BusTopicSource;
-import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
-import org.onap.policy.drools.utils.NetworkUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This topic source implementation specializes in reading messages
- * over a bus topic source and notifying its listeners
- */
-public abstract class SingleThreadedBusTopicSource
- extends BusTopicBase
- implements Runnable, BusTopicSource, FilterableTopicSource {
-
- /**
- * Not to be converted to PolicyLogger.
- * This will contain all instract /out traffic and only that in a single file in a concise format.
- */
- private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
- private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
-
- /**
- * Bus consumer group
- */
- protected final String consumerGroup;
-
- /**
- * Bus consumer instance
- */
- protected final String consumerInstance;
-
- /**
- * Bus fetch timeout
- */
- protected final int fetchTimeout;
-
- /**
- * Bus fetch limit
- */
- protected final int fetchLimit;
-
- /**
- * Message Bus Consumer
- */
- protected BusConsumer consumer;
-
- /**
- * Independent thread reading message over my topic
- */
- protected Thread busPollerThread;
-
-
- /**
- *
- * @param servers Bus servers
- * @param topic Bus Topic to be monitored
- * @param apiKey Bus API Key (optional)
- * @param apiSecret Bus API Secret (optional)
- * @param consumerGroup Bus Reader Consumer Group
- * @param consumerInstance Bus Reader Instance
- * @param fetchTimeout Bus fetch timeout
- * @param fetchLimit Bus fetch limit
- * @param useHttps does the bus use https
- * @param allowSelfSignedCerts are self-signed certificates allowed
- * @throws IllegalArgumentException An invalid parameter passed in
- */
- public SingleThreadedBusTopicSource(List<String> servers,
- String topic,
- String apiKey,
- String apiSecret,
- String consumerGroup,
- String consumerInstance,
- int fetchTimeout,
- int fetchLimit,
- boolean useHttps,
- boolean allowSelfSignedCerts) {
-
- super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
-
- if (consumerGroup == null || consumerGroup.isEmpty()) {
- this.consumerGroup = UUID.randomUUID ().toString();
- } else {
- this.consumerGroup = consumerGroup;
- }
-
- if (consumerInstance == null || consumerInstance.isEmpty()) {
- this.consumerInstance = NetworkUtil.getHostname();
- } else {
- this.consumerInstance = consumerInstance;
- }
-
- if (fetchTimeout <= 0) {
- this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
- } else {
- this.fetchTimeout = fetchTimeout;
- }
-
- if (fetchLimit <= 0) {
- this.fetchLimit = NO_LIMIT_FETCH;
- } else {
- this.fetchLimit = fetchLimit;
- }
-
- }
-
- /**
- * Initialize the Bus client
- */
- public abstract void init() throws MalformedURLException;
-
- @Override
- public void register(TopicListener topicListener) {
-
- super.register(topicListener);
-
- try {
- if (!alive && !locked)
- this.start();
- else
- logger.info("{}: register: start not attempted", this);
- } catch (Exception e) {
- logger.warn("{}: cannot start after registration of because of: {}",
- this, topicListener, e.getMessage(), e);
- }
- }
-
- @Override
- public void unregister(TopicListener topicListener) {
- boolean stop;
- synchronized (this) {
- super.unregister(topicListener);
- stop = this.topicListeners.isEmpty();
- }
-
- if (stop) {
- this.stop();
- }
- }
-
- @Override
- public boolean start() {
- logger.info("{}: starting", this);
-
- synchronized(this) {
-
- if (alive)
- return true;
-
- if (locked)
- throw new IllegalStateException(this + " is locked.");
-
- if (this.busPollerThread == null ||
- !this.busPollerThread.isAlive() ||
- this.consumer == null) {
-
- try {
- this.init();
- this.alive = true;
- this.busPollerThread = new Thread(this);
- this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
- busPollerThread.start();
- } catch (Exception e) {
- logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
- throw new IllegalStateException(e);
- }
- }
- }
-
- return this.alive;
- }
-
- @Override
- public boolean stop() {
- logger.info("{}: stopping", this);
-
- synchronized(this) {
- BusConsumer consumerCopy = this.consumer;
-
- this.alive = false;
- this.consumer = null;
-
- if (consumerCopy != null) {
- try {
- consumerCopy.close();
- } catch (Exception e) {
- logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
- }
- }
- }
-
- Thread.yield();
-
- return true;
- }
-
- /**
- * Run thread method for the Bus Reader
- */
- @Override
- public void run() {
- while (this.alive) {
- try {
- for (String event: this.consumer.fetch()) {
- synchronized (this) {
- this.recentEvents.add(event);
- }
-
- netLogger.info("[IN|{}|{}]{}{}",
- this.getTopicCommInfrastructure(), this.topic,
- System.lineSeparator(), event);
-
- broadcast(event);
-
- if (!this.alive)
- break;
- }
- } catch (Exception e) {
- logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
- }
- }
-
- logger.info("{}: exiting thread", this);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean offer(String event) {
- if (!this.alive) {
- throw new IllegalStateException(this + " is not alive.");
- }
-
- synchronized (this) {
- this.recentEvents.add(event);
- }
-
- netLogger.info("[IN|{}|{}]{}{}",this.getTopicCommInfrastructure(),this.topic,
- System.lineSeparator(), event);
-
-
- return broadcast(event);
- }
-
-
- @Override
- public void setFilter(String filter) {
- if(consumer instanceof FilterableBusConsumer) {
- ((FilterableBusConsumer) consumer).setFilter(filter);
-
- } else {
- throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
- }
- }
-
- @Override
- public String toString() {
- return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
- + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer="
- + this.consumer + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread
- + ", topicListeners=" + topicListeners.size() + ", toString()=" + super.toString() + "]";
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String getConsumerInstance() {
- return consumerInstance;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void shutdown() {
- this.stop();
- this.topicListeners.clear();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int getFetchTimeout() {
- return fetchTimeout;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int getFetchLimit() {
- return fetchLimit;
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
deleted file mode 100644
index 6a9a2d6d..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus.internal;
-
-import java.net.MalformedURLException;
-import java.util.List;
-import java.util.Map;
-
-import org.onap.policy.drools.event.comm.Topic;
-import org.onap.policy.drools.event.comm.bus.DmaapTopicSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This topic reader implementation specializes in reading messages
- * over DMAAP topic and notifying its listeners
- */
-public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
- implements DmaapTopicSource, Runnable {
-
- private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
-
-
- protected final String userName;
- protected final String password;
-
- protected String environment = null;
- protected String aftEnvironment = null;
- protected String partner = null;
- protected String latitude = null;
- protected String longitude = null;
-
- protected Map<String,String> additionalProps = null;
-
-
- /**
- *
- * @param servers DMaaP servers
- * @param topic DMaaP Topic to be monitored
- * @param apiKey DMaaP API Key (optional)
- * @param apiSecret DMaaP API Secret (optional)
- * @param consumerGroup DMaaP Reader Consumer Group
- * @param consumerInstance DMaaP Reader Instance
- * @param fetchTimeout DMaaP fetch timeout
- * @param fetchLimit DMaaP fetch limit
- * @param environment DME2 Environment
- * @param aftEnvironment DME2 AFT Environment
- * @param partner DME2 Partner
- * @param latitude DME2 Latitude
- * @param longitude DME2 Longitude
- * @param additionalProps Additional properties to pass to DME2
- * @param useHttps does connection use HTTPS?
- * @param allowSelfSignedCerts are self-signed certificates allow
- *
- * @throws IllegalArgumentException An invalid parameter passed in
- */
- public SingleThreadedDmaapTopicSource(List<String> servers, String topic,
- String apiKey, String apiSecret,
- String userName, String password,
- String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit,
- String environment, String aftEnvironment, String partner,
- String latitude, String longitude, Map<String,String> additionalProps,
- boolean useHttps, boolean allowSelfSignedCerts) {
-
- super(servers, topic, apiKey, apiSecret,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
-
- this.userName = userName;
- this.password = password;
-
- this.environment = environment;
- this.aftEnvironment = aftEnvironment;
- this.partner = partner;
-
- this.latitude = latitude;
- this.longitude = longitude;
-
- this.additionalProps = additionalProps;
- try {
- this.init();
- } catch (Exception e) {
- logger.error("ERROR during init of topic {}", this.topic);
- throw new IllegalArgumentException(e);
- }
- }
-
- /**
- *
- * @param servers DMaaP servers
- * @param topic DMaaP Topic to be monitored
- * @param apiKey DMaaP API Key (optional)
- * @param apiSecret DMaaP API Secret (optional)
- * @param consumerGroup DMaaP Reader Consumer Group
- * @param consumerInstance DMaaP Reader Instance
- * @param fetchTimeout DMaaP fetch timeout
- * @param fetchLimit DMaaP fetch limit
- * @param useHttps does connection use HTTPS?
- * @param allowSelfSignedCerts are self-signed certificates allow
- * @throws IllegalArgumentException An invalid parameter passed in
- */
- public SingleThreadedDmaapTopicSource(List<String> servers, String topic,
- String apiKey, String apiSecret,
- String userName, String password,
- String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) {
-
-
- super(servers, topic, apiKey, apiSecret,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
-
- this.userName = userName;
- this.password = password;
-
- try {
- this.init();
- } catch (Exception e) {
- logger.warn("dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
- throw new IllegalArgumentException(e);
- }
- }
-
-
- /**
- * Initialize the Cambria or MR Client
- */
- @Override
- public void init() throws MalformedURLException {
- if (anyNullOrEmpty(this.userName, this.password)) {
- this.consumer =
- new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
- this.apiKey, this.apiSecret,
- this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit,
- this.useHttps, this.allowSelfSignedCerts);
- } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
- this.consumer =
- new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
- this.apiKey, this.apiSecret,
- this.userName, this.password,
- this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit,
- this.useHttps, this.allowSelfSignedCerts);
- } else {
- this.consumer =
- new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic,
- this.apiKey, this.apiSecret,
- this.userName, this.password,
- this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit,
- this.environment, this.aftEnvironment, this.partner,
- this.latitude, this.longitude, this.additionalProps, this.useHttps);
- }
-
- logger.info("{}: INITTED", this);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.DMAAP;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
- .append((password == null || password.isEmpty()) ? "-" : password.length())
- .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure())
- .append(", toString()=").append(super.toString()).append("]");
- return builder.toString();
- }
-
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java
deleted file mode 100644
index fcbee631..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus.internal;
-
-import java.util.List;
-
-import org.onap.policy.drools.event.comm.Topic;
-import org.onap.policy.drools.event.comm.bus.UebTopicSource;
-
-/**
- * This topic source implementation specializes in reading messages
- * over an UEB Bus topic source and notifying its listeners
- */
-public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource
- implements UebTopicSource {
-
- /**
- *
- * @param servers UEB servers
- * @param topic UEB Topic to be monitored
- * @param apiKey UEB API Key (optional)
- * @param apiSecret UEB API Secret (optional)
- * @param consumerGroup UEB Reader Consumer Group
- * @param consumerInstance UEB Reader Instance
- * @param fetchTimeout UEB fetch timeout
- * @param fetchLimit UEB fetch limit
- * @param useHttps does topicSource use HTTPS?
- * @param allowSelfSignedCerts does topicSource allow self-signed certs?
- *
- * @throws IllegalArgumentException An invalid parameter passed in
- */
-
-
- public SingleThreadedUebTopicSource(List<String> servers, String topic,
- String apiKey, String apiSecret,
- String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) {
-
- super(servers, topic, apiKey, apiSecret,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
-
- this.allowSelfSignedCerts = allowSelfSignedCerts;
-
- this.init();
- }
-
- /**
- * Initialize the Cambria client
- */
- @Override
- public void init() {
- this.consumer =
- new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
- this.apiKey, this.apiSecret,
- this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.UEB;
- }
-
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("SingleThreadedUebTopicSource [getTopicCommInfrastructure()=")
- .append(getTopicCommInfrastructure()).append(", toString()=").append(super.toString()).append("]");
- return builder.toString();
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java
deleted file mode 100644
index 22c6b1d5..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.event.comm.bus.internal;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.collections4.queue.CircularFifoQueue;
-import org.onap.policy.drools.event.comm.Topic;
-import org.onap.policy.drools.event.comm.TopicListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class TopicBase implements Topic {
-
- /**
- * logger
- */
- private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
-
- /**
- * list of servers
- */
- protected List<String> servers;
-
- /**
- * Topic
- */
- protected String topic;
-
- /**
- * event cache
- */
- protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10);
-
- /**
- * Am I running?
- * reflects invocation of start()/stop()
- * !locked & start() => alive
- * stop() => !alive
- */
- protected volatile boolean alive = false;
-
- /**
- * Am I locked?
- * reflects invocation of lock()/unlock() operations
- * locked => !alive (but not in the other direction necessarily)
- * locked => !offer, !run, !start, !stop (but this last one is obvious
- * since locked => !alive)
- */
- protected volatile boolean locked = false;
-
- /**
- * All my subscribers for new message notifications
- */
- protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
-
- /**
- * Instantiates a new Topic Base
- *
- * @param servers list of servers
- * @param topic topic name
- *
- * @return a Topic Base
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public TopicBase(List<String> servers, String topic) {
-
- if (servers == null || servers.isEmpty()) {
- throw new IllegalArgumentException("Server(s) must be provided");
- }
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException("A Topic must be provided");
- }
-
- this.servers = servers;
- this.topic = topic;
- }
-
- @Override
- public void register(TopicListener topicListener) {
-
- logger.info("{}: registering {}", this, topicListener);
-
- synchronized(this) {
- if (topicListener == null)
- throw new IllegalArgumentException("TopicListener must be provided");
-
- for (TopicListener listener: this.topicListeners) {
- if (listener == topicListener) return;
- }
-
- this.topicListeners.add(topicListener);
- }
- }
-
- @Override
- public void unregister(TopicListener topicListener) {
-
- logger.info("{}: unregistering {}", this, topicListener);
-
- synchronized (this) {
- if (topicListener == null)
- throw new IllegalArgumentException("TopicListener must be provided");
-
- this.topicListeners.remove(topicListener);
- }
- }
-
- /**
- * broadcast event to all listeners
- *
- * @param message the event
- * @return true if all notifications are performed with no error, false otherwise
- */
- protected boolean broadcast(String message) {
- List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
-
- boolean success = true;
- for (TopicListener topicListener: snapshotListeners) {
- try {
- topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
- } catch (Exception e) {
- logger.warn("{}: notification error @ {} because of {}",
- this, topicListener, e.getMessage(), e);
- success = false;
- }
- }
- return success;
- }
-
- /**
- * take a snapshot of current topic listeners
- *
- * @return the topic listeners
- */
- protected synchronized List<TopicListener> snapshotTopicListeners() {
- @SuppressWarnings("unchecked")
- List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
- return listeners;
- }
-
- @Override
- public boolean lock() {
-
- logger.info("{}: locking", this);
-
- synchronized (this) {
- if (this.locked)
- return true;
-
- this.locked = true;
- }
-
- return this.stop();
- }
-
- @Override
- public boolean unlock() {
- logger.info("{}: unlocking", this);
-
- synchronized(this) {
- if (!this.locked)
- return true;
-
- this.locked = false;
- }
-
- try {
- return this.start();
- } catch (Exception e) {
- logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
- return false;
- }
- }
-
- @Override
- public boolean isLocked() {
- return this.locked;
- }
-
- @Override
- public String getTopic() {
- return topic;
- }
-
- @Override
- public boolean isAlive() {
- return this.alive;
- }
-
- @Override
- public List<String> getServers() {
- return servers;
- }
-
- @Override
- public synchronized String[] getRecentEvents() {
- String[] events = new String[recentEvents.size()];
- return recentEvents.toArray(events);
- }
-
-
- @Override
- public String toString() {
- return "TopicBase [servers=" + servers + ", topic=" + topic + ", #recentEvents=" + recentEvents.size() + ", locked="
- + locked + ", #topicListeners=" + topicListeners.size() + "]";
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClient.java b/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClient.java
deleted file mode 100644
index e5becdf4..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClient.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.policy.drools.http.client;
-
-import javax.ws.rs.core.Response;
-
-import org.onap.policy.drools.properties.Startable;
-
-public interface HttpClient extends Startable {
-
- public Response get(String path);
-
- public Response get();
-
- public static <T> T getBody(Response response, Class<T> entityType) {
- return response.readEntity(entityType);
- }
-
- public String getName();
- public boolean isHttps();
- public boolean isSelfSignedCerts();
- public String getHostname();
- public int getPort();
- public String getBasePath();
- public String getUserName();
- public String getPassword();
- public String getBaseUrl();
-
-
- public static final HttpClientFactory factory = new IndexedHttpClientFactory();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClientFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClientFactory.java
deleted file mode 100644
index 1094a2fb..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/HttpClientFactory.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.policy.drools.http.client;
-
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-
-import org.onap.policy.drools.http.client.internal.JerseyClient;
-import org.onap.policy.drools.properties.PolicyProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Http Client Factory
- */
-public interface HttpClientFactory {
-
- /**
- * build and http client with the following parameters
- */
- public HttpClient build(String name, boolean https,
- boolean selfSignedCerts,
- String hostname, int port,
- String baseUrl, String userName,
- String password, boolean managed)
- throws KeyManagementException, NoSuchAlgorithmException;
-
- /**
- * build http client from properties
- */
- public List<HttpClient> build(Properties properties)
- throws KeyManagementException, NoSuchAlgorithmException;
-
- /**
- * get http client
- * @param name the name
- * @return the http client
- */
- public HttpClient get(String name);
-
- /**
- * list of http clients
- * @return http clients
- */
- public List<HttpClient> inventory();
-
- /**
- * destroy by name
- * @param name name
- */
- public void destroy(String name);
-
- public void destroy();
-}
-
-/**
- * http client factory implementation indexed by name
- */
-class IndexedHttpClientFactory implements HttpClientFactory {
-
- /**
- * Logger
- */
- private static Logger logger = LoggerFactory.getLogger(IndexedHttpClientFactory.class);
-
- protected HashMap<String, HttpClient> clients = new HashMap<>();
-
- @Override
- public synchronized HttpClient build(String name, boolean https, boolean selfSignedCerts,
- String hostname, int port,
- String baseUrl, String userName, String password,
- boolean managed)
- throws KeyManagementException, NoSuchAlgorithmException {
- if (clients.containsKey(name))
- return clients.get(name);
-
- JerseyClient client =
- new JerseyClient(name, https, selfSignedCerts, hostname, port, baseUrl, userName, password);
-
- if (managed)
- clients.put(name, client);
-
- return client;
- }
-
- @Override
- public synchronized List<HttpClient> build(Properties properties)
- throws KeyManagementException, NoSuchAlgorithmException {
- ArrayList<HttpClient> clientList = new ArrayList<>();
-
- String clientNames = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES);
- if (clientNames == null || clientNames.isEmpty()) {
- return clientList;
- }
-
- List<String> clientNameList =
- new ArrayList<>(Arrays.asList(clientNames.split("\\s*,\\s*")));
-
- for (String clientName : clientNameList) {
- String httpsString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." +
- clientName +
- PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
- boolean https = false;
- if (httpsString != null && !httpsString.isEmpty()) {
- https = Boolean.parseBoolean(httpsString);
- }
-
- String hostName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." +
- clientName +
- PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX);
-
- String servicePortString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." +
- clientName +
- PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX);
- int port;
- try {
- if (servicePortString == null || servicePortString.isEmpty()) {
- continue;
- }
- port = Integer.parseInt(servicePortString);
- } catch (NumberFormatException nfe) {
- logger.error("http-client-factory: cannot parse port {}", servicePortString, nfe);
- continue;
- }
-
- String baseUrl = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." +
- clientName +
- PolicyProperties.PROPERTY_HTTP_URL_SUFFIX);
-
- String userName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." +
- clientName +
- PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX);
-
- String password = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." +
- clientName +
- PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX);
-
- String managedString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." +
- clientName +
- PolicyProperties.PROPERTY_MANAGED_SUFFIX);
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
-
- try {
- HttpClient client =
- this.build(clientName, https, https, hostName, port, baseUrl,
- userName, password, managed);
- clientList.add(client);
- } catch (Exception e) {
- logger.error("http-client-factory: cannot build client {}", clientName, e);
- }
- }
-
- return clientList;
- }
-
- @Override
- public synchronized HttpClient get(String name) {
- if (clients.containsKey(name)) {
- return clients.get(name);
- }
-
- throw new IllegalArgumentException("Http Client " + name + " not found");
- }
-
- @Override
- public synchronized List<HttpClient> inventory() {
- return new ArrayList<>(this.clients.values());
- }
-
- @Override
- public synchronized void destroy(String name) {
- if (!clients.containsKey(name)) {
- return;
- }
-
- HttpClient client = clients.remove(name);
- try {
- client.shutdown();
- } catch (IllegalStateException e) {
- logger.error("http-client-factory: cannot shutdown client {}", client, e);
- }
- }
-
- @Override
- public void destroy() {
- List<HttpClient> clientsInventory = this.inventory();
- for (HttpClient client: clientsInventory) {
- client.shutdown();
- }
-
- synchronized(this) {
- this.clients.clear();
- }
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/internal/JerseyClient.java b/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/internal/JerseyClient.java
deleted file mode 100644
index 6a254e2e..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/http/client/internal/JerseyClient.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.http.client.internal;
-
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.X509TrustManager;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.core.Response;
-import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
-import org.onap.policy.drools.http.client.HttpClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-public class JerseyClient implements HttpClient {
-
- /**
- * Logger
- */
- private static Logger logger = LoggerFactory.getLogger(JerseyClient.class);
-
- protected final String name;
- protected final boolean https;
- protected final boolean selfSignedCerts;
- protected final String hostname;
- protected final int port;
- protected final String basePath;
- protected final String userName;
- protected final String password;
-
- protected final Client client;
- protected final String baseUrl;
-
- protected boolean alive = true;
-
-
- public JerseyClient(String name, boolean https,
- boolean selfSignedCerts,
- String hostname, int port,
- String basePath, String userName,
- String password)
- throws KeyManagementException, NoSuchAlgorithmException {
-
- super();
-
- if (name == null || name.isEmpty())
- throw new IllegalArgumentException("Name must be provided");
-
- if (hostname == null || hostname.isEmpty())
- throw new IllegalArgumentException("Hostname must be provided");
-
- if (port <= 0 && port >= 65535)
- throw new IllegalArgumentException("Invalid Port provided: " + port);
-
- this.name = name;
- this.https = https;
- this.hostname = hostname;
- this.port = port;
- this.basePath = basePath;
- this.userName = userName;
- this.password = password;
- this.selfSignedCerts = selfSignedCerts;
-
- StringBuilder tmpBaseUrl = new StringBuilder();
- if (this.https) {
- tmpBaseUrl.append("https://");
- ClientBuilder clientBuilder;
- SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
- if (this.selfSignedCerts) {
- sslContext.init(null, new TrustManager[]{new X509TrustManager() {
- @Override
- public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
- // always trusted
- }
- @Override
- public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
- // always trusted
- }
- @Override
- public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; }
-
- }}, new SecureRandom());
- clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext).hostnameVerifier((host,session) -> true);
- } else {
- sslContext.init(null, null, null);
- clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext);
- }
- this.client = clientBuilder.build();
- } else {
- tmpBaseUrl.append("http://");
- this.client = ClientBuilder.newClient();
- }
-
- if (this.userName != null && !this.userName.isEmpty() &&
- this.password != null && !this.password.isEmpty()) {
- HttpAuthenticationFeature authFeature = HttpAuthenticationFeature.basic(userName, password);
- this.client.register(authFeature);
- }
-
- this.baseUrl = tmpBaseUrl.append(this.hostname).append(":").
- append(this.port).append("/").
- append((this.basePath == null) ? "" : this.basePath).
- toString();
- }
-
- @Override
- public Response get(String path) {
- if (path != null && !path.isEmpty())
- return this.client.target(this.baseUrl).path(path).request().get();
- else
- return this.client.target(this.baseUrl).request().get();
- }
-
- @Override
- public Response get() {
- return this.client.target(this.baseUrl).request().get();
- }
-
-
- @Override
- public boolean start() {
- return alive;
- }
-
- @Override
- public boolean stop() {
- return !alive;
- }
-
- @Override
- public void shutdown() {
- synchronized(this) {
- alive = false;
- }
-
- try {
- this.client.close();
- } catch (Exception e) {
- logger.warn("{}: cannot close because of {}", this,
- e.getMessage(), e);
- }
- }
-
- @Override
- public synchronized boolean isAlive() {
- return this.alive;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public boolean isHttps() {
- return https;
- }
-
- @Override
- public boolean isSelfSignedCerts() {
- return selfSignedCerts;
- }
-
- @Override
- public String getHostname() {
- return hostname;
- }
-
- @Override
- public int getPort() {
- return port;
- }
-
- @Override
- public String getBasePath() {
- return basePath;
- }
-
- @Override
- public String getUserName() {
- return userName;
- }
-
- @JsonIgnore
- @Override
- public String getPassword() {
- return password;
- }
-
- @Override
- public String getBaseUrl() {
- return baseUrl;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("JerseyClient [name=");
- builder.append(name);
- builder.append(", https=");
- builder.append(https);
- builder.append(", selfSignedCerts=");
- builder.append(selfSignedCerts);
- builder.append(", hostname=");
- builder.append(hostname);
- builder.append(", port=");
- builder.append(port);
- builder.append(", basePath=");
- builder.append(basePath);
- builder.append(", userName=");
- builder.append(userName);
- builder.append(", password=");
- builder.append(password);
- builder.append(", client=");
- builder.append(client);
- builder.append(", baseUrl=");
- builder.append(baseUrl);
- builder.append(", alive=");
- builder.append(alive);
- builder.append("]");
- return builder.toString();
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServer.java
deleted file mode 100644
index 3cd702ae..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServer.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.policy.drools.http.server;
-
-import org.onap.policy.drools.properties.Startable;
-
-/**
- * A Jetty Server to server REST Requests
- */
-public interface HttpServletServer extends Startable {
-
-
- /**
- * factory for managing and tracking DMAAP sources
- */
- public static HttpServletServerFactory factory = new IndexedHttpServletServerFactory();
-
- /**
- *
- * @return port
- */
- public int getPort();
-
- /**
- * enables basic authentication with user and password on the the relative path relativeUriPath
- *
- * @param user
- * @param password
- * @param relativeUriPath
- */
- public void setBasicAuthentication(String user, String password, String relativeUriPath);
-
- /**
- * adds a JAX-RS servlet class to serve REST requests
- *
- * @param servletPath servlet path
- * @param restClass JAX-RS API Class
- *
- * @throws IllegalArgumentException unable to process because of invalid input
- * @throws IllegalStateException unable to process because of invalid state
- */
- public void addServletClass(String servletPath, String restClass);
-
- /**
- * adds a package containing JAX-RS classes to serve REST requests
- *
- * @param servletPath servlet path
- * @param restPackage JAX-RS package to scan
- *
- * @throws IllegalArgumentException unable to process because of invalid input
- * @throws IllegalStateException unable to process because of invalid state
- */
- public void addServletPackage(String servletPath, String restPackage);
-
- /**
- * blocking start of the http server
- *
- * @param maxWaitTime max time to wait for the start to take place
- * @return true if start was successful
- *
- * @throws IllegalArgumentException if arguments are invalid
- * @throws InterruptedException if the blocking operation is interrupted
- */
- public boolean waitedStart(long maxWaitTime) throws InterruptedException;
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServerFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServerFactory.java
deleted file mode 100644
index f4dc85bc..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/HttpServletServerFactory.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.policy.drools.http.server;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-import org.onap.policy.drools.http.server.internal.JettyJerseyServer;
-import org.onap.policy.drools.properties.PolicyProperties;
-
-/**
- * Factory of HTTP Servlet-Enabled Servlets
- */
-public interface HttpServletServerFactory {
-
- /**
- * builds an http server with support for servlets
- *
- * @param name name
- * @param host binding host
- * @param port port
- * @param contextPath server base path
- * @param swagger enable swagger documentation
- * @param managed is it managed by infrastructure
- * @return http server
- * @throws IllegalArgumentException when invalid parameters are provided
- */
- public HttpServletServer build(String name, String host, int port, String contextPath,
- boolean swagger, boolean managed);
-
- /**
- * list of http servers per properties
- *
- * @param properties properties based configuration
- * @return list of http servers
- * @throws IllegalArgumentException when invalid parameters are provided
- */
- public List<HttpServletServer> build(Properties properties);
-
- /**
- * gets a server based on the port
- *
- * @param port port
- * @return http server
- */
- public HttpServletServer get(int port);
-
- /**
- * provides an inventory of servers
- *
- * @return inventory of servers
- */
- public List<HttpServletServer> inventory();
-
- /**
- * destroys server bound to a port
- * @param port
- */
- public void destroy(int port);
-
- /**
- * destroys the factory and therefore all servers
- */
- public void destroy();
-}
-
-/**
- * Indexed factory implementation
- */
-class IndexedHttpServletServerFactory implements HttpServletServerFactory {
-
- private static final String SPACES_COMMA_SPACES = "\\s*,\\s*";
-
- /**
- * logger
- */
- protected static Logger logger = LoggerFactory.getLogger(IndexedHttpServletServerFactory.class);
-
- /**
- * servers index
- */
- protected HashMap<Integer, HttpServletServer> servers = new HashMap<>();
-
- @Override
- public synchronized HttpServletServer build(String name, String host, int port,
- String contextPath, boolean swagger,
- boolean managed) {
-
- if (servers.containsKey(port))
- return servers.get(port);
-
- JettyJerseyServer server = new JettyJerseyServer(name, host, port, contextPath, swagger);
- if (managed)
- servers.put(port, server);
-
- return server;
- }
-
- @Override
- public synchronized List<HttpServletServer> build(Properties properties) {
-
- ArrayList<HttpServletServer> serviceList = new ArrayList<>();
-
- String serviceNames = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES);
- if (serviceNames == null || serviceNames.isEmpty()) {
- logger.warn("No topic for HTTP Service: {}", properties);
- return serviceList;
- }
-
- List<String> serviceNameList = Arrays.asList(serviceNames.split(SPACES_COMMA_SPACES));
-
- for (String serviceName : serviceNameList) {
- String servicePortString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
- serviceName +
- PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX);
-
- int servicePort;
- try {
- if (servicePortString == null || servicePortString.isEmpty()) {
- if (logger.isWarnEnabled())
- logger.warn("No HTTP port for service in {}", serviceName);
- continue;
- }
- servicePort = Integer.parseInt(servicePortString);
- } catch (NumberFormatException nfe) {
- if (logger.isWarnEnabled())
- logger.warn("No HTTP port for service in {}", serviceName);
- continue;
- }
-
- String hostName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
- serviceName +
- PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX);
-
- String contextUriPath = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
- serviceName +
- PolicyProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX);
-
- String userName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
- serviceName +
- PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX);
-
- String password = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
- serviceName +
- PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX);
-
- String authUriPath = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
- serviceName +
- PolicyProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX);
-
- String restClasses = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
- serviceName +
- PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX);
-
- String restPackages = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
- serviceName +
- PolicyProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX);
- String restUriPath = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
- serviceName +
- PolicyProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX);
-
- String managedString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
- serviceName +
- PolicyProperties.PROPERTY_MANAGED_SUFFIX);
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
-
- String swaggerString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
- serviceName +
- PolicyProperties.PROPERTY_HTTP_SWAGGER_SUFFIX);
- boolean swagger = false;
- if (swaggerString != null && !swaggerString.isEmpty()) {
- swagger = Boolean.parseBoolean(swaggerString);
- }
-
- HttpServletServer service = build(serviceName, hostName, servicePort, contextUriPath, swagger, managed);
- if (userName != null && !userName.isEmpty() && password != null && !password.isEmpty()) {
- service.setBasicAuthentication(userName, password, authUriPath);
- }
-
- if (restClasses != null && !restClasses.isEmpty()) {
- List<String> restClassesList = Arrays.asList(restClasses.split(SPACES_COMMA_SPACES));
- for (String restClass : restClassesList)
- service.addServletClass(restUriPath, restClass);
- }
-
- if (restPackages != null && !restPackages.isEmpty()) {
- List<String> restPackageList = Arrays.asList(restPackages.split(SPACES_COMMA_SPACES));
- for (String restPackage : restPackageList)
- service.addServletPackage(restUriPath, restPackage);
- }
-
- serviceList.add(service);
- }
-
- return serviceList;
- }
-
- @Override
- public synchronized HttpServletServer get(int port) {
-
- if (servers.containsKey(port)) {
- return servers.get(port);
- }
-
- throw new IllegalArgumentException("Http Server for " + port + " not found");
- }
-
- @Override
- public synchronized List<HttpServletServer> inventory() {
- return new ArrayList<>(this.servers.values());
- }
-
- @Override
- public synchronized void destroy(int port) {
-
- if (!servers.containsKey(port)) {
- return;
- }
-
- HttpServletServer server = servers.remove(port);
- server.shutdown();
- }
-
- @Override
- public synchronized void destroy() {
- List<HttpServletServer> httpServletServers = this.inventory();
- for (HttpServletServer server: httpServletServers) {
- server.shutdown();
- }
-
- synchronized(this) {
- this.servers.clear();
- }
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyJerseyServer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyJerseyServer.java
deleted file mode 100644
index 0cbd983d..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyJerseyServer.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.policy.drools.http.server.internal;
-
-import java.util.HashMap;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.onap.policy.drools.utils.NetworkUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import io.swagger.jersey.config.JerseyJaxrsConfig;
-
-/**
- * REST Jetty Server that uses Jersey Servlets to support JAX-RS Web Services
- */
-public class JettyJerseyServer extends JettyServletServer {
-
- /**
- * Swagger API Base Path
- */
- protected static final String SWAGGER_API_BASEPATH = "swagger.api.basepath";
-
- /**
- * Swagger Context ID
- */
- protected static final String SWAGGER_CONTEXT_ID = "swagger.context.id";
-
- /**
- * Swagger Scanner ID
- */
- protected static final String SWAGGER_SCANNER_ID = "swagger.scanner.id";
-
- /**
- * Swagger Pretty Print
- */
- protected static final String SWAGGER_PRETTY_PRINT = "swagger.pretty.print";
-
- /**
- * Swagger Packages
- */
- protected static final String SWAGGER_INIT_PACKAGES_PARAM_VALUE = "io.swagger.jaxrs.listing";
-
- /**
- * Jersey Packages Init Param Name
- */
- protected static final String JERSEY_INIT_PACKAGES_PARAM_NAME = "jersey.config.server.provider.packages";
-
- /**
- * Jersey Packages Init Param Value
- */
- protected static final String JERSEY_INIT_PACKAGES_PARAM_VALUE = "com.fasterxml.jackson.jaxrs.json";
-
- /**
- * Jersey Classes Init Param Name
- */
- protected static final String JERSEY_INIT_CLASSNAMES_PARAM_NAME = "jersey.config.server.provider.classnames";
-
- /**
- * Jersey Jackson Classes Init Param Value
- */
- protected static final String JERSEY_JACKSON_INIT_CLASSNAMES_PARAM_VALUE = "com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider";
-
- /**
- * Jersey Swagger Classes Init Param Value
- */
- protected static final String SWAGGER_INIT_CLASSNAMES_PARAM_VALUE = "io.swagger.jaxrs.listing.ApiListingResource," +
- "io.swagger.jaxrs.listing.SwaggerSerializers";
- /**
- * Logger
- */
- protected static Logger logger = LoggerFactory.getLogger(JettyJerseyServer.class);
-
- /**
- * Container for servlets
- */
- protected HashMap<String, ServletHolder> servlets = new HashMap<>();
-
- /**
- * Swagger ID
- */
- protected String swaggerId = null;
-
- /**
- * Constructor
- *
- * @param name name
- * @param host host server host
- * @param port port server port
- * @param swagger support swagger?
- * @param contextPath context path
- *
- * @throws IllegalArgumentException in invalid arguments are provided
- */
- public JettyJerseyServer(String name, String host, int port, String contextPath, boolean swagger) {
-
- super(name, host, port, contextPath);
- if (swagger) {
- this.swaggerId = "swagger-" + this.port;
- attachSwaggerServlet();
- }
- }
-
- /**
- * attaches a swagger initialization servlet
- */
- protected void attachSwaggerServlet() {
-
- ServletHolder swaggerServlet = context.addServlet(JerseyJaxrsConfig.class, "/");
-
- String hostname = this.connector.getHost();
- if (hostname == null || hostname.isEmpty() || hostname.equals(NetworkUtil.IPv4_WILDCARD_ADDRESS)) {
- hostname = NetworkUtil.getHostname();
- }
-
- swaggerServlet.setInitParameter(SWAGGER_API_BASEPATH,
- "http://" + hostname + ":" + this.connector.getPort() + "/");
- swaggerServlet.setInitParameter(SWAGGER_CONTEXT_ID, swaggerId);
- swaggerServlet.setInitParameter(SWAGGER_SCANNER_ID, swaggerId);
- swaggerServlet.setInitParameter(SWAGGER_PRETTY_PRINT, "true");
- swaggerServlet.setInitOrder(2);
-
- if (logger.isDebugEnabled())
- logger.debug("{}: Swagger Servlet has been attached: {}", this, swaggerServlet.dump());
- }
-
- /**
- * retrieves cached server based on servlet path
- *
- * @param servletPath servlet path
- * @return the jetty servlet holder
- *
- * @throws IllegalArgumentException if invalid arguments are provided
- */
- protected synchronized ServletHolder getServlet(String servletPath) {
-
- ServletHolder jerseyServlet = servlets.get(servletPath);
- if (jerseyServlet == null) {
- jerseyServlet = context.addServlet
- (org.glassfish.jersey.servlet.ServletContainer.class, servletPath);
- jerseyServlet.setInitOrder(0);
- servlets.put(servletPath, jerseyServlet);
- }
-
- return jerseyServlet;
- }
-
- @Override
- public synchronized void addServletPackage(String servletPath, String restPackage) {
- String servPath = servletPath;
- if (restPackage == null || restPackage.isEmpty())
- throw new IllegalArgumentException("No discoverable REST package provided");
-
- if (servPath == null || servPath.isEmpty())
- servPath = "/*";
-
- ServletHolder jerseyServlet = this.getServlet(servPath);
-
- String initClasses =
- jerseyServlet.getInitParameter(JERSEY_INIT_CLASSNAMES_PARAM_NAME);
- if (initClasses != null && !initClasses.isEmpty())
- logger.warn("Both packages and classes are used in Jetty+Jersey Configuration: {}", restPackage);
-
- String initPackages =
- jerseyServlet.getInitParameter(JERSEY_INIT_PACKAGES_PARAM_NAME);
- if (initPackages == null) {
- if (this.swaggerId != null) {
- initPackages = JERSEY_INIT_PACKAGES_PARAM_VALUE + "," +
- SWAGGER_INIT_PACKAGES_PARAM_VALUE + "," +
- restPackage;
-
- jerseyServlet.setInitParameter(SWAGGER_CONTEXT_ID, swaggerId);
- jerseyServlet.setInitParameter(SWAGGER_SCANNER_ID, swaggerId);
- } else {
- initPackages = JERSEY_INIT_PACKAGES_PARAM_VALUE + "," +
- restPackage;
- }
- } else {
- initPackages = initPackages + "," + restPackage;
- }
-
- jerseyServlet.setInitParameter(JERSEY_INIT_PACKAGES_PARAM_NAME, initPackages);
-
- if (logger.isDebugEnabled())
- logger.debug("{}: added REST package: {}", this, jerseyServlet.dump());
- }
-
- @Override
- public synchronized void addServletClass(String servletPath, String restClass) {
-
- if (restClass == null || restClass.isEmpty())
- throw new IllegalArgumentException("No discoverable REST class provided");
-
- if (servletPath == null || servletPath.isEmpty())
- servletPath = "/*";
-
- ServletHolder jerseyServlet = this.getServlet(servletPath);
-
- String initPackages =
- jerseyServlet.getInitParameter(JERSEY_INIT_PACKAGES_PARAM_NAME);
- if (initPackages != null && !initPackages.isEmpty())
- logger.warn("Both classes and packages are used in Jetty+Jersey Configuration: {}", restClass);
-
- String initClasses =
- jerseyServlet.getInitParameter(JERSEY_INIT_CLASSNAMES_PARAM_NAME);
- if (initClasses == null) {
- if (this.swaggerId != null) {
- initClasses = JERSEY_JACKSON_INIT_CLASSNAMES_PARAM_VALUE + "," +
- SWAGGER_INIT_CLASSNAMES_PARAM_VALUE + "," +
- restClass;
-
- jerseyServlet.setInitParameter(SWAGGER_CONTEXT_ID, swaggerId);
- jerseyServlet.setInitParameter(SWAGGER_SCANNER_ID, swaggerId);
- } else {
- initClasses = JERSEY_JACKSON_INIT_CLASSNAMES_PARAM_VALUE + "," + restClass;
- }
- } else {
- initClasses = initClasses + "," + restClass;
- }
-
- jerseyServlet.setInitParameter(JERSEY_INIT_CLASSNAMES_PARAM_NAME, initClasses);
-
- if (logger.isDebugEnabled())
- logger.debug("{}: added REST class: {}", this, jerseyServlet.dump());
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("JettyJerseyServer [servlets=").append(servlets).append(", swaggerId=").append(swaggerId)
- .append(", toString()=").append(super.toString()).append("]");
- return builder.toString();
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyServletServer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyServletServer.java
deleted file mode 100644
index 08c62445..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/http/server/internal/JettyServletServer.java
+++ /dev/null
@@ -1,388 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.policy.drools.http.server.internal;
-
-import org.eclipse.jetty.security.ConstraintMapping;
-import org.eclipse.jetty.security.ConstraintSecurityHandler;
-import org.eclipse.jetty.security.HashLoginService;
-import org.eclipse.jetty.security.authentication.BasicAuthenticator;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.server.Slf4jRequestLog;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.util.security.Constraint;
-import org.eclipse.jetty.util.security.Credential;
-import org.onap.policy.drools.http.server.HttpServletServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-/**
- * Http Server implementation using Embedded Jetty
- */
-public abstract class JettyServletServer implements HttpServletServer, Runnable {
-
- /**
- * Logger
- */
- private static Logger logger = LoggerFactory.getLogger(JettyServletServer.class);
-
- /**
- * server name
- */
- protected final String name;
-
- /**
- * server host address
- */
- protected final String host;
-
- /**
- * server port to bind
- */
- protected final int port;
-
- /**
- * server auth user name
- */
- protected String user;
-
- /**
- * server auth password name
- */
- protected String password;
-
- /**
- * server base context path
- */
- protected final String contextPath;
-
- /**
- * embedded jetty server
- */
- protected final Server jettyServer;
-
- /**
- * servlet context
- */
- protected final ServletContextHandler context;
-
- /**
- * jetty connector
- */
- protected final ServerConnector connector;
-
- /**
- * jetty thread
- */
- protected volatile Thread jettyThread;
-
- /**
- * start condition
- */
- protected Object startCondition = new Object();
-
- /**
- * constructor
- *
- * @param name server name
- * @param host server host
- * @param port server port
- * @param contextPath context path
- *
- * @throws IllegalArgumentException if invalid parameters are passed in
- */
- public JettyServletServer(String name, String host, int port, String contextPath) {
- String srvName = name;
- String srvHost = host;
- String ctxtPath = contextPath;
-
- if (srvName == null || srvName.isEmpty())
- srvName = "http-" + port;
-
- if (port <= 0 && port >= 65535)
- throw new IllegalArgumentException("Invalid Port provided: " + port);
-
- if (srvHost == null || srvHost.isEmpty())
- srvHost = "localhost";
-
- if (ctxtPath == null || ctxtPath.isEmpty())
- ctxtPath = "/";
-
- this.name = srvName;
-
- this.host = srvHost;
- this.port = port;
-
- this.contextPath = ctxtPath;
-
- this.context = new ServletContextHandler(ServletContextHandler.SESSIONS);
- this.context.setContextPath(ctxtPath);
-
- this.jettyServer = new Server();
- this.jettyServer.setRequestLog(new Slf4jRequestLog());
-
- this.connector = new ServerConnector(this.jettyServer);
- this.connector.setName(srvName);
- this.connector.setReuseAddress(true);
- this.connector.setPort(port);
- this.connector.setHost(srvHost);
-
- this.jettyServer.addConnector(this.connector);
- this.jettyServer.setHandler(context);
- }
-
- @Override
- public void setBasicAuthentication(String user, String password, String servletPath) {
- String srvltPath = servletPath;
-
- if (user == null || user.isEmpty() || password == null || password.isEmpty())
- throw new IllegalArgumentException("Missing user and/or password");
-
- if (srvltPath == null || srvltPath.isEmpty())
- srvltPath = "/*";
-
- HashLoginService hashLoginService = new HashLoginService();
- hashLoginService.putUser(user,
- Credential.getCredential(password),
- new String[] {"user"});
- hashLoginService.setName(this.connector.getName() + "-login-service");
-
- Constraint constraint = new Constraint();
- constraint.setName(Constraint.__BASIC_AUTH);
- constraint.setRoles(new String[]{"user"});
- constraint.setAuthenticate(true);
-
- ConstraintMapping constraintMapping = new ConstraintMapping();
- constraintMapping.setConstraint(constraint);
- constraintMapping.setPathSpec(srvltPath);
-
- ConstraintSecurityHandler securityHandler = new ConstraintSecurityHandler();
- securityHandler.setAuthenticator(new BasicAuthenticator());
- securityHandler.setRealmName(this.connector.getName() + "-realm");
- securityHandler.addConstraintMapping(constraintMapping);
- securityHandler.setLoginService(hashLoginService);
-
- this.context.setSecurityHandler(securityHandler);
-
- this.user = user;
- this.password = password;
- }
-
- /**
- * jetty server execution
- */
- @Override
- public void run() {
- try {
- logger.info("{}: STARTING", this);
-
- this.jettyServer.start();
-
- if (logger.isInfoEnabled())
- logger.info("{}: STARTED: {}", this, this.jettyServer.dump());
-
- synchronized(this.startCondition) {
- this.startCondition.notifyAll();
- }
-
- this.jettyServer.join();
- } catch (Exception e) {
- logger.error("{}: error found while bringing up server", this, e);
- }
- }
-
- @Override
- public boolean waitedStart(long maxWaitTime) throws InterruptedException {
- logger.info("{}: WAITED-START", this);
-
- if (maxWaitTime < 0)
- throw new IllegalArgumentException("max-wait-time cannot be negative");
-
- long pendingWaitTime = maxWaitTime;
-
- if (!this.start())
- return false;
-
- synchronized (this.startCondition) {
-
- while (!this.jettyServer.isRunning()) {
- try {
- long startTs = System.currentTimeMillis();
-
- this.startCondition.wait(pendingWaitTime);
-
- if (maxWaitTime == 0)
- /* spurious notification */
- continue;
-
- long endTs = System.currentTimeMillis();
- pendingWaitTime = pendingWaitTime - (endTs - startTs);
-
- logger.info("{}: pending time is {} ms.", this, pendingWaitTime);
-
- if (pendingWaitTime <= 0)
- return false;
-
- } catch (InterruptedException e) {
- logger.warn("{}: waited-start has been interrupted", this);
- throw e;
- }
- }
-
- return this.jettyServer.isRunning();
- }
- }
-
- @Override
- public boolean start() {
- logger.info("{}: STARTING", this);
-
- synchronized(this) {
- if (jettyThread == null ||
- !this.jettyThread.isAlive()) {
-
- this.jettyThread = new Thread(this);
- this.jettyThread.setName(this.name + "-" + this.port);
- this.jettyThread.start();
- }
- }
-
- return true;
- }
-
- @Override
- public boolean stop() {
- logger.info("{}: STOPPING", this);
-
- synchronized(this) {
- if (jettyThread == null) {
- return true;
- }
-
- if (!jettyThread.isAlive()) {
- this.jettyThread = null;
- }
-
- try {
- this.connector.stop();
- } catch (Exception e) {
- logger.error("{}: error while stopping management server", this, e);
- }
-
- try {
- this.jettyServer.stop();
- } catch (Exception e) {
- logger.error("{}: error while stopping management server", this, e);
- return false;
- }
-
- Thread.yield();
- }
-
- return true;
- }
-
- @Override
- public void shutdown() {
- logger.info("{}: SHUTTING DOWN", this);
-
- this.stop();
-
- if (this.jettyThread == null)
- return;
-
- Thread jettyThreadCopy = this.jettyThread;
-
- if (jettyThreadCopy.isAlive()) {
- try {
- jettyThreadCopy.join(2000L);
- } catch (InterruptedException e) {
- logger.warn("{}: error while shutting down management server", this);
- Thread.currentThread().interrupt();
- }
- if (!jettyThreadCopy.isInterrupted()) {
- try {
- jettyThreadCopy.interrupt();
- } catch(Exception e) {
- // do nothing
- logger.warn("{}: exception while shutting down (OK)", this, e);
- }
- }
- }
-
- this.jettyServer.destroy();
- }
-
- @Override
- public boolean isAlive() {
- if (this.jettyThread != null)
- return this.jettyThread.isAlive();
-
- return false;
- }
-
- @Override
- public int getPort() {
- return this.port;
- }
-
- /**
- * @return the name
- */
- public String getName() {
- return name;
- }
-
- /**
- * @return the host
- */
- public String getHost() {
- return host;
- }
-
- /**
- * @return the user
- */
- public String getUser() {
- return user;
- }
-
- /**
- * @return the password
- */
- @JsonIgnore
- public String getPassword() {
- return password;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("JettyServer [name=").append(name).append(", host=").append(host).append(", port=").append(port)
- .append(", user=").append(user).append(", password=").append(password != null).append(", contextPath=")
- .append(contextPath).append(", jettyServer=").append(jettyServer).append(", context=").append(this.context)
- .append(", connector=").append(connector).append(", jettyThread=").append(jettyThread)
- .append("]");
- return builder.toString();
- }
-
-}