diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/openecomp')
33 files changed, 5494 insertions, 0 deletions
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/Topic.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/Topic.java new file mode 100644 index 00000000..d38bab5a --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/Topic.java @@ -0,0 +1,76 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm; + +import java.util.List; + +/** + * Essential Topic Data + */ +public interface Topic { + + public static final String NETWORK_LOGGER = "networkLogger"; + + /** + * Underlying Communication infrastructure Types + */ + public enum CommInfrastructure { + /** + * UEB Communication Infrastructure + */ + UEB, + /** + * DMAAP Communication Infrastructure + */ + DMAAP, + /** + * REST Communication Infrastructure + */ + REST + } + + /** + * gets the topic name + * + * @return topic name + */ + public String getTopic(); + + /** + * gets the communication infrastructure type + * @return + */ + public CommInfrastructure getTopicCommInfrastructure(); + + /** + * return list of servers + * @return bus servers + */ + public List<String> getServers(); + + /** + * get the more recent events in this topic entity + * + * @return list of most recent events + */ + public String[] getRecentEvents(); + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java new file mode 100644 index 00000000..b3f236f7 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java @@ -0,0 +1,692 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink; +import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource; +import org.openecomp.policy.drools.event.comm.bus.UebTopicSink; +import org.openecomp.policy.drools.event.comm.bus.UebTopicSource; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.flexlogger.Logger; +import org.openecomp.policy.common.logging.eelf.MessageCodes; +import org.openecomp.policy.drools.properties.Lockable; +import org.openecomp.policy.drools.properties.Startable; +import com.fasterxml.jackson.annotation.JsonIgnore; + +/** + * Abstraction to managed the system's Networked Topic Endpoints, + * sources of all events input into the System. + */ +public interface TopicEndpoint extends Startable, Lockable { + + /** + * Add Topic Sources to the communication infrastructure initialized per + * properties + * + * @param properties properties for Topic Source construction + * @return a generic Topic Source + * @throws IllegalArgumentException when invalid arguments are provided + */ + public List<? extends TopicSource> addTopicSources(Properties properties) + throws IllegalArgumentException; + + /** + * Add Topic Sinks to the communication infrastructure initialized per + * properties + * + * @param properties properties for Topic Sink construction + * @return a generic Topic Sink + * @throws IllegalArgumentException when invalid arguments are provided + */ + public List<? extends TopicSink> addTopicSinks(Properties properties) + throws IllegalArgumentException; + + /** + * gets all Topic Sources + * @return the Topic Source List + */ + List<? extends TopicSource> getTopicSources(); + + /** + * get the Topic Sources for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source List + * @throws IllegalStateException if the entity is in an invalid state + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<? extends TopicSource> getTopicSources(List<String> topicNames) + throws IllegalStateException, IllegalArgumentException; + + /** + * gets the Topic Source for the given topic name and + * underlying communication infrastructure type + * + * @param commType communication infrastructure type + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for + * example multiple TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + * @throws UnsupportedOperationException if the operation is not supported. + */ + public TopicSource getTopicSource(Topic.CommInfrastructure commType, + String topicName) + throws IllegalStateException, IllegalArgumentException, + UnsupportedOperationException; + + /** + * get the UEB Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the UEB Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for + * example multiple TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSource getUebTopicSource(String topicName) + throws IllegalStateException, IllegalArgumentException; + + /** + * get the DMAAP Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the DMAAP Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for + * example multiple TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSource getDmaapTopicSource(String topicName) + throws IllegalStateException, IllegalArgumentException; + + /** + * get the Topic Sinks for the given topic name + * + * @param topicNames the topic names + * @return the Topic Sink List + * @throws IllegalStateException + * @throws IllegalArgumentException + */ + public List<? extends TopicSink> getTopicSinks(List<String> topicNames) + throws IllegalStateException, IllegalArgumentException; + + /** + * get the Topic Sinks for the given topic name and + * underlying communication infrastructure type + * + * @param topicName the topic name + * @param commType communication infrastructure type + * + * @return the Topic Sink List + * @throws IllegalStateException if the entity is in an invalid state, for + * example multiple TopicWriters for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public TopicSink getTopicSink(Topic.CommInfrastructure commType, + String topicName) + throws IllegalStateException, IllegalArgumentException, + UnsupportedOperationException; + + /** + * get the Topic Sinks for the given topic name and + * all the underlying communication infrastructure type + * + * @param topicName the topic name + * @param commType communication infrastructure type + * + * @return the Topic Sink List + * @throws IllegalStateException if the entity is in an invalid state, for + * example multiple TopicWriters for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<? extends TopicSink> getTopicSinks(String topicName) + throws IllegalStateException, IllegalArgumentException; + + /** + * get the UEB Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for + * example multiple TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSink getUebTopicSink(String topicName) + throws IllegalStateException, IllegalArgumentException; + + /** + * get the DMAAP Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for + * example multiple TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSink getDmaapTopicSink(String topicName) + throws IllegalStateException, IllegalArgumentException; + + /** + * gets only the UEB Topic Sources + * @return the UEB Topic Source List + */ + public List<UebTopicSource> getUebTopicSources(); + + /** + * gets only the DMAAP Topic Sources + * @return the DMAAP Topic Source List + */ + public List<DmaapTopicSource> getDmaapTopicSources(); + + /** + * gets all Topic Sinks + * @return the Topic Sink List + */ + public List<? extends TopicSink> getTopicSinks(); + + /** + * gets only the UEB Topic Sinks + * @return the UEB Topic Sink List + */ + public List<UebTopicSink> getUebTopicSinks(); + + /** + * gets only the DMAAP Topic Sinks + * @return the DMAAP Topic Sink List + */ + public List<DmaapTopicSink> getDmaapTopicSinks(); + + /** + * singleton for global access + */ + public static final TopicEndpoint manager = new ProxyTopicEndpointManager(); +} + +/* + * ----------------- implementation ------------------- + */ + +/** + * This implementation of the Topic Endpoint Manager, proxies operations to appropriate + * implementations according to the communication infrastructure that are supported + */ +class ProxyTopicEndpointManager implements TopicEndpoint { + // get an instance of logger + private static Logger logger = FlexLogger.getLogger(ProxyTopicEndpointManager.class); + /** + * Is this element locked? + */ + protected volatile boolean locked = false; + + /** + * Is this element alive? + */ + protected volatile boolean alive = false; + + /** + * {@inheritDoc} + */ + @Override + public List<? extends TopicSource> addTopicSources(Properties properties) throws IllegalArgumentException { + + // 1. Create UEB Sources + // 2. Create DMAAP Sources + + List<TopicSource> sources = new ArrayList<TopicSource>(); + + sources.addAll(UebTopicSource.factory.build(properties)); + sources.addAll(DmaapTopicSource.factory.build(properties)); + + if (this.isLocked()) { + for (TopicSource source : sources) { + source.lock(); + } + } + + return sources; + } + + /** + * {@inheritDoc} + */ + @Override + public List<? extends TopicSink> addTopicSinks(Properties properties) throws IllegalArgumentException { + // 1. Create UEB Sinks + // 2. Create DMAAP Sinks + + List<TopicSink> sinks = new ArrayList<TopicSink>(); + + sinks.addAll(UebTopicSink.factory.build(properties)); + sinks.addAll(DmaapTopicSink.factory.build(properties)); + + if (this.isLocked()) { + for (TopicSink sink : sinks) { + sink.lock(); + } + } + + return sinks; + } + + /** + * {@inheritDoc} + */ + @Override + public List<? extends TopicSource> getTopicSources() { + + List<TopicSource> sources = new ArrayList<TopicSource>(); + + sources.addAll(UebTopicSource.factory.inventory()); + sources.addAll(DmaapTopicSource.factory.inventory()); + + return sources; + } + + /** + * {@inheritDoc} + */ + @Override + public List<? extends TopicSink> getTopicSinks() { + + List<TopicSink> sinks = new ArrayList<TopicSink>(); + + sinks.addAll(UebTopicSink.factory.inventory()); + sinks.addAll(DmaapTopicSink.factory.inventory()); + + return sinks; + } + + /** + * {@inheritDoc} + */ + @JsonIgnore + @Override + public List<UebTopicSource> getUebTopicSources() { + return UebTopicSource.factory.inventory(); + } + + /** + * {@inheritDoc} + */ + @JsonIgnore + @Override + public List<DmaapTopicSource> getDmaapTopicSources() { + return DmaapTopicSource.factory.inventory(); + } + + /** + * {@inheritDoc} + */ + @JsonIgnore + @Override + public List<UebTopicSink> getUebTopicSinks() { + return UebTopicSink.factory.inventory(); + } + + /** + * {@inheritDoc} + */ + @JsonIgnore + @Override + public List<DmaapTopicSink> getDmaapTopicSinks() { + return DmaapTopicSink.factory.inventory(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean start() throws IllegalStateException { + + synchronized (this) { + if (this.locked) { + throw new IllegalStateException(this + " is locked"); + } + + if (this.alive) { + return true; + } + + this.alive = true; + } + + List<Startable> endpoints = getEndpoints(); + + boolean success = true; + for (Startable endpoint: endpoints) { + try { + success = endpoint.start() && success; + } catch (Exception e) { + success = false; + logger.error(MessageCodes.EXCEPTION_ERROR, e, endpoint.toString(), this.toString()); + } + } + + return success; + } + + + /** + * {@inheritDoc} + */ + @Override + public boolean stop() throws IllegalStateException { + + /* + * stop regardless if it is locked, in other + * words, stop operation has precedence over + * locks. + */ + synchronized (this) { + this.alive = false; + } + + List<Startable> endpoints = getEndpoints(); + + boolean success = true; + for (Startable endpoint: endpoints) { + try { + success = endpoint.stop() && success; + } catch (Exception e) { + success = false; + logger.error(MessageCodes.EXCEPTION_ERROR, e, endpoint.toString(), this.toString()); + } + } + + return success; + } + + /** + * + * @return list of managed endpoints + */ + @JsonIgnore + protected List<Startable> getEndpoints() { + List<Startable> endpoints = new ArrayList<Startable>(); + + endpoints.addAll(this.getTopicSources()); + endpoints.addAll(this.getTopicSinks()); + + return endpoints; + } + + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() throws IllegalStateException { + UebTopicSource.factory.destroy(); + UebTopicSink.factory.destroy(); + + DmaapTopicSource.factory.destroy(); + DmaapTopicSink.factory.destroy(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isAlive() { + return this.alive; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean lock() { + + synchronized (this) { + if (locked) + return true; + + this.locked = true; + } + + for (TopicSource source: this.getTopicSources()) { + source.lock(); + } + + for (TopicSink sink: this.getTopicSinks()) { + sink.lock(); + } + + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean unlock() { + synchronized (this) { + if (!locked) + return true; + + this.locked = false; + } + + for (TopicSource source: this.getTopicSources()) { + source.unlock(); + } + + for (TopicSink sink: this.getTopicSinks()) { + sink.unlock(); + } + + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isLocked() { + return this.locked; + } + + /** + * {@inheritDoc} + */ + @Override + public List<? extends TopicSource> getTopicSources(List<String> topicNames) + throws IllegalStateException, IllegalArgumentException { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + List<TopicSource> sources = new ArrayList<TopicSource>(); + for (String topic: topicNames) { + try { + TopicSource uebSource = this.getUebTopicSource(topic); + if (uebSource != null) + sources.add(uebSource); + } catch (Exception e) { + logger.info("No UEB source for topic: " + topic); + } + + try { + TopicSource dmaapSource = this.getDmaapTopicSource(topic); + if (dmaapSource != null) + sources.add(dmaapSource); + } catch (Exception e) { + logger.info("No DMAAP source for topic: " + topic); + } + } + return sources; + } + + /** + * {@inheritDoc} + */ + @Override + public List<? extends TopicSink> getTopicSinks(List<String> topicNames) + throws IllegalStateException, IllegalArgumentException { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + List<TopicSink> sinks = new ArrayList<TopicSink>(); + for (String topic: topicNames) { + try { + TopicSink uebSink = this.getUebTopicSink(topic); + if (uebSink != null) + sinks.add(uebSink); + } catch (Exception e) { + logger.info("No UEB sink for topic: " + topic); + } + + try { + TopicSink dmaapSink = this.getDmaapTopicSink(topic); + if (dmaapSink != null) + sinks.add(dmaapSink); + } catch (Exception e) { + logger.info("No DMAAP sink for topic: " + topic); + } + } + return sinks; + } + + /** + * {@inheritDoc} + */ + @Override + public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) + throws IllegalStateException, IllegalArgumentException, UnsupportedOperationException { + + if (commType == null) { + throw new IllegalArgumentException + ("Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + if (topicName == null) { + throw new IllegalArgumentException + ("Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + switch (commType) { + case UEB: + return this.getUebTopicSource(topicName); + case DMAAP: + return this.getDmaapTopicSource(topicName); + case REST: + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) + throws IllegalStateException, IllegalArgumentException, UnsupportedOperationException { + if (commType == null) { + throw new IllegalArgumentException + ("Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + if (topicName == null) { + throw new IllegalArgumentException + ("Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + switch (commType) { + case UEB: + return this.getUebTopicSink(topicName); + case DMAAP: + return this.getDmaapTopicSink(topicName); + case REST: + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<? extends TopicSink> getTopicSinks(String topicName) + throws IllegalStateException, IllegalArgumentException { + + if (topicName == null) { + throw new IllegalArgumentException + ("Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + List<TopicSink> sinks = new ArrayList<TopicSink>(); + + try { + sinks.add(this.getUebTopicSink(topicName)); + } catch (Exception e) { + ; + } + + try { + sinks.add(this.getDmaapTopicSink(topicName)); + } catch (Exception e) { + ; + } + + return sinks; + } + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSource getUebTopicSource(String topicName) throws IllegalStateException, IllegalArgumentException { + return UebTopicSource.factory.get(topicName); + } + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSink getUebTopicSink(String topicName) throws IllegalStateException, IllegalArgumentException { + return UebTopicSink.factory.get(topicName); + } + + @Override + public DmaapTopicSource getDmaapTopicSource(String topicName) + throws IllegalStateException, IllegalArgumentException { + return DmaapTopicSource.factory.get(topicName); + } + + @Override + public DmaapTopicSink getDmaapTopicSink(String topicName) throws IllegalStateException, IllegalArgumentException { + return DmaapTopicSink.factory.get(topicName); + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java new file mode 100644 index 00000000..7a2e9711 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java @@ -0,0 +1,42 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm; + +/** + * Listener for event messages entering the Policy Engine + */ +public interface TopicListener { + + /** + * Notification of a new Event over a given Topic + * + * @param commType communication infrastructure type + * @param topic topic name + * @param event event message as a string + * + * @return boolean. True if the invoking event dispatcher should continue + * dispatching the event to subsequent listeners. False if it is requested + * to the invoking event dispatcher to stop dispatching the same event to + * other listeners of less priority. This mechanism is generally not used. + */ + public boolean onTopicEvent(Topic.CommInfrastructure commType, String topic, String event); + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicRegisterable.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicRegisterable.java new file mode 100644 index 00000000..2ce8e9e7 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicRegisterable.java @@ -0,0 +1,42 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm; + +/** + * Marks a Topic entity as registerable + */ +public interface TopicRegisterable { + + /** + * Register for notification of events with this Topic Entity + * + * @param topicListener the listener of events + */ + public void register(TopicListener topicListener); + + /** + * Unregisters for notification of events with this Topic Entity + * + * @param topicListener the listener of events + */ + public void unregister(TopicListener topicListener); + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSink.java new file mode 100644 index 00000000..2250b1ea --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSink.java @@ -0,0 +1,42 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm; + +import org.openecomp.policy.drools.properties.Lockable; +import org.openecomp.policy.drools.properties.Startable; + +/** + * Marks a given Topic Endpoint as able to send messages over a topic + */ +public interface TopicSink extends Topic, Startable, Lockable { + + /** + * Sends a string message over this Topic Endpoint + * + * @param message message to send + * @return true if the send operation succeeded, false otherwise + * @throws IllegalArgumentException an invalid message has been provided + * @throws IllegalStateException the entity is in an state that prevents + * it from sending messages, for example, locked or stopped. + */ + public boolean send(String message) throws IllegalArgumentException, IllegalStateException; + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSource.java new file mode 100644 index 00000000..0dfbe1c4 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSource.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm; + +import org.openecomp.policy.drools.properties.Lockable; +import org.openecomp.policy.drools.properties.Startable; + +/** + * Marker for a Topic Entity, indicating that the entity is able to read + * over a topic + */ +public interface TopicSource extends TopicRegisterable, Topic, Startable, Lockable { + + /** + * pushes an event into the source programatically + * + * @param event the event in json format + * @return true if it can be processed correctly, false otherwise + */ + public boolean offer(String event); + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopic.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopic.java new file mode 100644 index 00000000..c38f627e --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopic.java @@ -0,0 +1,26 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus; + +public interface BusTopic { + public String getApiKey(); + public String getApiSecret(); +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSink.java new file mode 100644 index 00000000..30978c27 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSink.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus; + +import org.openecomp.policy.drools.event.comm.TopicSink; + +/** + * Topic Sink over Bus Infrastructure (DMAAP/UEB) + */ +public interface BusTopicSink extends BusTopic, TopicSink { + /** + * Log Failures after X number of retries + */ + public static final int DEFAULT_LOG_SEND_FAILURES_AFTER = 1; + + /** + * Sets the UEB partition key for published messages + * + * @param partitionKey the partition key + */ + public void setPartitionKey(String partitionKey); + + /** + * return the partition key in used by the system to publish messages + * + * @return the partition key + */ + public String getPartitionKey(); +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSource.java new file mode 100644 index 00000000..e6a46d2f --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSource.java @@ -0,0 +1,83 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus; + +import org.openecomp.policy.drools.event.comm.TopicSource; + +/** + * Generic Topic Source for UEB/DMAAP Communication Infrastructure + * + */ +public interface BusTopicSource extends BusTopic, TopicSource { + + /** + * Default Consumer Instance Value + */ + public static String DEFAULT_CONSUMER_INSTANCE = "0"; + + /** + * Default Timeout fetching in milliseconds + */ + public static int DEFAULT_TIMEOUT_MS_FETCH = 15000; + + /** + * Default maximum number of messages fetch at the time + */ + public static int DEFAULT_LIMIT_FETCH = 100; + + /** + * Definition of No Timeout fetching + */ + public static int NO_TIMEOUT_MS_FETCH = -1; + + /** + * Definition of No limit fetching + */ + public static int NO_LIMIT_FETCH = -1; + + /** + * gets the consumer group + * + * @return consumer group + */ + public String getConsumerGroup(); + + /** + * gets the consumer instance + * + * @return consumer instance + */ + public String getConsumerInstance(); + + /** + * gets the fetch timeout + * + * @return fetch timeout + */ + public int getFetchTimeout(); + + /** + * gets the fetch limit + * + * @return fetch limit + */ + public int getFetchLimit(); +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSink.java new file mode 100644 index 00000000..3c55c9f3 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSink.java @@ -0,0 +1,30 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus; + +public interface DmaapTopicSink extends BusTopicSink { + + /** + * Factory of UebTopicWriter for instantiation and management purposes + */ + + public static final DmaapTopicSinkFactory factory = new IndexedDmaapTopicSinkFactory(); +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java new file mode 100644 index 00000000..5b4cfd42 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java @@ -0,0 +1,308 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.flexlogger.Logger; +import org.openecomp.policy.drools.properties.PolicyProperties; + +/** + * DMAAP Topic Sink Factory + */ +public interface DmaapTopicSinkFactory { + + /** + * Instantiates a new DMAAP Topic Sink + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param userName AAF user name + * @param password AAF password + * @param partitionKey Consumer Group + * @param managed is this sink endpoint managed? + * + * @return an DMAAP Topic Sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSink build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String userName, + String password, + String partitionKey, + boolean managed) + throws IllegalArgumentException; + + /** + * Creates an DMAAP Topic Sink based on properties files + * + * @param properties Properties containing initialization values + * + * @return an DMAAP Topic Sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<DmaapTopicSink> build(Properties properties) + throws IllegalArgumentException; + + /** + * Instantiates a new DMAAP Topic Sink + * + * @param servers list of servers + * @param topic topic name + * + * @return an DMAAP Topic Sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSink build(List<String> servers, String topic) + throws IllegalArgumentException; + + /** + * Destroys an DMAAP Topic Sink based on a topic + * + * @param topic topic name + * @throws IllegalArgumentException if invalid parameters are present + */ + public void destroy(String topic); + + /** + * gets an DMAAP Topic Sink based on topic name + * @param topic the topic name + * + * @return an DMAAP Topic Sink with topic name + * @throws IllegalArgumentException if an invalid topic is provided + * @throws IllegalStateException if the DMAAP Topic Reader is + * an incorrect state + */ + public DmaapTopicSink get(String topic) + throws IllegalArgumentException, IllegalStateException; + + /** + * Provides a snapshot of the DMAAP Topic Sinks + * @return a list of the DMAAP Topic Sinks + */ + public List<DmaapTopicSink> inventory(); + + /** + * Destroys all DMAAP Topic Sinks + */ + public void destroy(); +} + +/* ------------- implementation ----------------- */ + +/** + * Factory of DMAAP Reader Topics indexed by topic name + */ +class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { + // get an instance of logger + private static Logger logger = FlexLogger.getLogger(IndexedDmaapTopicSinkFactory.class); + + /** + * DMAAP Topic Name Index + */ + protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = + new HashMap<String, DmaapTopicSink>(); + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSink build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String userName, + String password, + String partitionKey, + boolean managed) + throws IllegalArgumentException { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized (this) { + if (dmaapTopicWriters.containsKey(topic)) { + return dmaapTopicWriters.get(topic); + } + + DmaapTopicSink dmaapTopicSink = + new InlineDmaapTopicSink(servers, topic, + apiKey, apiSecret, + userName, password, + partitionKey); + + if (managed) + dmaapTopicWriters.put(topic, dmaapTopicSink); + return dmaapTopicSink; + } + } + + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSink build(List<String> servers, String topic) throws IllegalArgumentException { + return this.build(servers, topic, null, null, null, null, null, true); + } + + + /** + * {@inheritDoc} + */ + @Override + public List<DmaapTopicSink> build(Properties properties) throws IllegalArgumentException { + + String writeTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS); + if (writeTopics == null || writeTopics.isEmpty()) { + logger.warn("No topic for DMAAP Sink " + properties); + return new ArrayList<DmaapTopicSink>(); + } + List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); + + synchronized(this) { + List<DmaapTopicSink> dmaapTopicWriters = new ArrayList<DmaapTopicSink>(); + for (String topic: writeTopicList) { + + String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (servers == null || servers.isEmpty()) { + logger.error("No DMAAP servers provided in " + properties); + continue; + } + + List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*"))); + + String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + + String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); + String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); + + String partitionKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); + + String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, + apiKey, apiSecret, aafMechId, aafPassword, + partitionKey, managed); + dmaapTopicWriters.add(dmaapTopicSink); + } + return dmaapTopicWriters; + } + } + + /** + * {@inheritDoc} + */ + @Override + public void destroy(String topic) + throws IllegalArgumentException { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + DmaapTopicSink dmaapTopicWriter; + synchronized(this) { + if (!dmaapTopicWriters.containsKey(topic)) { + return; + } + + dmaapTopicWriter = dmaapTopicWriters.remove(topic); + } + + dmaapTopicWriter.shutdown(); + } + + /** + * {@inheritDoc} + */ + @Override + public void destroy() { + List<DmaapTopicSink> writers = this.inventory(); + for (DmaapTopicSink writer: writers) { + writer.shutdown(); + } + + synchronized(this) { + this.dmaapTopicWriters.clear(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSink get(String topic) + throws IllegalArgumentException, IllegalStateException { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized(this) { + if (dmaapTopicWriters.containsKey(topic)) { + return dmaapTopicWriters.get(topic); + } else { + throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized List<DmaapTopicSink> inventory() { + List<DmaapTopicSink> writers = + new ArrayList<DmaapTopicSink>(this.dmaapTopicWriters.values()); + return writers; + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSource.java new file mode 100644 index 00000000..8da7906a --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSource.java @@ -0,0 +1,29 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus; + +public interface DmaapTopicSource extends BusTopicSource { + + /** + * factory for managing and tracking DMAAP sources + */ + public static DmaapTopicSourceFactory factory = new IndexedDmaapTopicSourceFactory(); +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java new file mode 100644 index 00000000..f8d85eb7 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java @@ -0,0 +1,380 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedDmaapTopicSource; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.flexlogger.Logger; +import org.openecomp.policy.drools.properties.PolicyProperties; +/** + * DMAAP Topic Source Factory + */ +public interface DmaapTopicSourceFactory { + + /** + * Creates an DMAAP Topic Source based on properties files + * + * @param properties Properties containing initialization values + * + * @return an DMAAP Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<DmaapTopicSource> build(Properties properties) + throws IllegalArgumentException; + + /** + * Instantiates a new DMAAP Topic Source + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param userName user name + * @param password password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Read Fetch Timeout + * @param fetchLimit Fetch Limit + * @param managed is this endpoind managed? + * + * @return an DMAAP Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSource build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String userName, + String password, + String consumerGroup, + String consumerInstance, + int fetchTimeout, + int fetchLimit, + boolean managed) + throws IllegalArgumentException; + + /** + * Instantiates a new DMAAP Topic Source + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * + * @return an DMAAP Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSource build(List<String> servers, + String topic, + String apiKey, + String apiSecret) + throws IllegalArgumentException; + + /** + * Instantiates a new DMAAP Topic Source + * + * @param uebTopicReaderType Implementation type + * @param servers list of servers + * @param topic topic name + * + * @return an DMAAP Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSource build(List<String> servers, + String topic) + throws IllegalArgumentException; + + /** + * Destroys an DMAAP Topic Source based on a topic + * + * @param topic topic name + * @throws IllegalArgumentException if invalid parameters are present + */ + public void destroy(String topic); + + /** + * Destroys all DMAAP Topic Sources + */ + public void destroy(); + + /** + * gets an DMAAP Topic Source based on topic name + * @param topic the topic name + * @return an DMAAP Topic Source with topic name + * @throws IllegalArgumentException if an invalid topic is provided + * @throws IllegalStateException if the DMAAP Topic Source is + * an incorrect state + */ + public DmaapTopicSource get(String topic) + throws IllegalArgumentException, IllegalStateException; + + /** + * Provides a snapshot of the DMAAP Topic Sources + * @return a list of the DMAAP Topic Sources + */ + public List<DmaapTopicSource> inventory(); +} + + +/* ------------- implementation ----------------- */ + +/** + * Factory of DMAAP Source Topics indexed by topic name + */ + +class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { + // get an instance of logger + private static Logger logger = FlexLogger.getLogger(IndexedDmaapTopicSourceFactory.class); + /** + * UEB Topic Name Index + */ + protected HashMap<String, DmaapTopicSource> dmaapTopicSources = + new HashMap<String, DmaapTopicSource>(); + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSource build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String userName, + String password, + String consumerGroup, + String consumerInstance, + int fetchTimeout, + int fetchLimit, + boolean managed) + throws IllegalArgumentException { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized(this) { + if (dmaapTopicSources.containsKey(topic)) { + return dmaapTopicSources.get(topic); + } + + DmaapTopicSource dmaapTopicSource = + new SingleThreadedDmaapTopicSource(servers, topic, + apiKey, apiSecret, userName, password, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit); + + if (managed) + dmaapTopicSources.put(topic, dmaapTopicSource); + + return dmaapTopicSource; + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<DmaapTopicSource> build(Properties properties) + throws IllegalArgumentException { + + String readTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS); + if (readTopics == null || readTopics.isEmpty()) { + logger.warn("No topic for UEB Source " + properties); + return new ArrayList<DmaapTopicSource>(); + } + List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*"))); + + List<DmaapTopicSource> dmaapTopicSource_s = new ArrayList<DmaapTopicSource>(); + synchronized(this) { + for (String topic: readTopicList) { + + String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + if (servers == null || servers.isEmpty()) { + logger.error("No UEB servers provided in " + properties); + continue; + } + + List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*"))); + + String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + + String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + + String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); + + String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); + + String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); + + String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); + + String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); + int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH; + if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { + try { + fetchTimeout = Integer.parseInt(fetchTimeoutString); + } catch (NumberFormatException nfe) { + logger.warn("Fetch Timeout in invalid format for topic " + topic + ": " + fetchTimeoutString); + } + } + + String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); + int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH; + if (fetchLimitString != null && !fetchLimitString.isEmpty()) { + try { + fetchLimit = Integer.parseInt(fetchLimitString); + } catch (NumberFormatException nfe) { + logger.warn("Fetch Limit in invalid format for topic " + topic + ": " + fetchLimitString); + } + } + + String managedString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + DmaapTopicSource uebTopicSource = this.build(serverList, topic, + apiKey, apiSecret, aafMechId, aafPassword, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, managed); + dmaapTopicSource_s.add(uebTopicSource); + } + } + return dmaapTopicSource_s; + } + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSource build(List<String> servers, + String topic, + String apiKey, + String apiSecret) { + return this.build(servers, topic, + apiKey, apiSecret, null, null, + null, null, + DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, + DmaapTopicSource.DEFAULT_LIMIT_FETCH, + true); + } + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSource build(List<String> servers, String topic) { + return this.build(servers, topic, null, null); + } + + /** + * {@inheritDoc} + */ + @Override + public void destroy(String topic) + throws IllegalArgumentException { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + DmaapTopicSource uebTopicSource; + + synchronized(this) { + if (!dmaapTopicSources.containsKey(topic)) { + return; + } + + uebTopicSource = dmaapTopicSources.remove(topic); + } + + uebTopicSource.shutdown(); + } + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSource get(String topic) + throws IllegalArgumentException, IllegalStateException { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized(this) { + if (dmaapTopicSources.containsKey(topic)) { + return dmaapTopicSources.get(topic); + } else { + throw new IllegalArgumentException("DmaapTopicSource for " + topic + " not found"); + } + } + } + + @Override + public synchronized List<DmaapTopicSource> inventory() { + List<DmaapTopicSource> readers = + new ArrayList<DmaapTopicSource>(this.dmaapTopicSources.values()); + return readers; + } + + @Override + public void destroy() { + List<DmaapTopicSource> readers = this.inventory(); + for (DmaapTopicSource reader: readers) { + reader.shutdown(); + } + + synchronized(this) { + this.dmaapTopicSources.clear(); + } + } + +} + diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSink.java new file mode 100644 index 00000000..efa4dc5e --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSink.java @@ -0,0 +1,32 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus; + +/** + * Topic Writer over UEB Infrastructure + */ +public interface UebTopicSink extends BusTopicSink { + + /** + * Factory of UebTopicWriter for instantiation and management purposes + */ + public static final UebTopicSinkFactory factory = new IndexedUebTopicSinkFactory(); +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java new file mode 100644 index 00000000..85b98838 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java @@ -0,0 +1,292 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.openecomp.policy.drools.event.comm.bus.internal.InlineUebTopicSink; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.flexlogger.Logger; +import org.openecomp.policy.drools.properties.PolicyProperties; + +/** + * UEB Topic Sink Factory + */ +public interface UebTopicSinkFactory { + + /** + * Instantiates a new UEB Topic Writer + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param partitionKey Consumer Group + * @param managed is this sink endpoint managed? + * + * @return an UEB Topic Writer + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSink build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String partitionKey, + boolean managed) + throws IllegalArgumentException; + + /** + * Creates an UEB Topic Writer based on properties files + * + * @param properties Properties containing initialization values + * + * @return an UEB Topic Writer + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<UebTopicSink> build(Properties properties) + throws IllegalArgumentException; + + /** + * Instantiates a new UEB Topic Writer + * + * @param servers list of servers + * @param topic topic name + * + * @return an UEB Topic Writer + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSink build(List<String> servers, String topic) + throws IllegalArgumentException; + + /** + * Destroys an UEB Topic Writer based on a topic + * + * @param topic topic name + * @throws IllegalArgumentException if invalid parameters are present + */ + public void destroy(String topic); + + /** + * gets an UEB Topic Writer based on topic name + * @param topic the topic name + * + * @return an UEB Topic Writer with topic name + * @throws IllegalArgumentException if an invalid topic is provided + * @throws IllegalStateException if the UEB Topic Reader is + * an incorrect state + */ + public UebTopicSink get(String topic) + throws IllegalArgumentException, IllegalStateException; + + /** + * Provides a snapshot of the UEB Topic Writers + * @return a list of the UEB Topic Writers + */ + public List<UebTopicSink> inventory(); + + /** + * Destroys all UEB Topic Writers + */ + public void destroy(); +} + +/* ------------- implementation ----------------- */ + +/** + * Factory of UEB Reader Topics indexed by topic name + */ +class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { + // get an instance of logger + private static Logger logger = FlexLogger.getLogger(IndexedUebTopicSinkFactory.class); + /** + * UEB Topic Name Index + */ + protected HashMap<String, UebTopicSink> uebTopicSinks = + new HashMap<String, UebTopicSink>(); + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSink build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String partitionKey, + boolean managed) + throws IllegalArgumentException { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized (this) { + if (uebTopicSinks.containsKey(topic)) { + return uebTopicSinks.get(topic); + } + + UebTopicSink uebTopicWriter = + new InlineUebTopicSink(servers, topic, + apiKey, apiSecret,partitionKey); + + if (managed) + uebTopicSinks.put(topic, uebTopicWriter); + + return uebTopicWriter; + } + } + + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSink build(List<String> servers, String topic) throws IllegalArgumentException { + return this.build(servers, topic, null, null, null, true); + } + + + /** + * {@inheritDoc} + */ + @Override + public List<UebTopicSink> build(Properties properties) throws IllegalArgumentException { + + String writeTopics = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS); + if (writeTopics == null || writeTopics.isEmpty()) { + logger.warn("No topic for UEB Sink " + properties); + return new ArrayList<UebTopicSink>(); + } + List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); + + synchronized(this) { + List<UebTopicSink> uebTopicWriters = new ArrayList<UebTopicSink>(); + for (String topic: writeTopicList) { + + String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + + topic + + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (servers == null || servers.isEmpty()) { + logger.error("No UEB servers provided in " + properties); + continue; + } + + List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*"))); + + String apiKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + String partitionKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); + + String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + UebTopicSink uebTopicWriter = this.build(serverList, topic, + apiKey, apiSecret, + partitionKey, managed); + uebTopicWriters.add(uebTopicWriter); + } + return uebTopicWriters; + } + } + + /** + * {@inheritDoc} + */ + @Override + public void destroy(String topic) + throws IllegalArgumentException { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + UebTopicSink uebTopicWriter; + synchronized(this) { + if (!uebTopicSinks.containsKey(topic)) { + return; + } + + uebTopicWriter = uebTopicSinks.remove(topic); + } + + uebTopicWriter.shutdown(); + } + + /** + * {@inheritDoc} + */ + @Override + public void destroy() { + List<UebTopicSink> writers = this.inventory(); + for (UebTopicSink writer: writers) { + writer.shutdown(); + } + + synchronized(this) { + this.uebTopicSinks.clear(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSink get(String topic) + throws IllegalArgumentException, IllegalStateException { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized(this) { + if (uebTopicSinks.containsKey(topic)) { + return uebTopicSinks.get(topic); + } else { + throw new IllegalStateException("UebTopicSink for " + topic + " not found"); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized List<UebTopicSink> inventory() { + List<UebTopicSink> writers = + new ArrayList<UebTopicSink>(this.uebTopicSinks.values()); + return writers; + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSource.java new file mode 100644 index 00000000..4da01302 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSource.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus; + +/** + * Topic Source for UEB Communication Infrastructure + * + */ +public interface UebTopicSource extends BusTopicSource { + + /** + * factory for managing and tracking UEB readers + */ + public static UebTopicSourceFactory factory = + new IndexedUebTopicSourceFactory(); +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java new file mode 100644 index 00000000..bf2a4038 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java @@ -0,0 +1,362 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedUebTopicSource; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.flexlogger.Logger; +import org.openecomp.policy.drools.properties.PolicyProperties; + +/** + * UEB Topic Source Factory + */ +public interface UebTopicSourceFactory { + + /** + * Creates an UEB Topic Source based on properties files + * + * @param properties Properties containing initialization values + * + * @return an UEB Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<UebTopicSource> build(Properties properties) + throws IllegalArgumentException; + + /** + * Instantiates a new UEB Topic Source + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Read Fetch Timeout + * @param fetchLimit Fetch Limit + * @param managed is this source endpoint managed? + * + * @return an UEB Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSource build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String consumerGroup, + String consumerInstance, + int fetchTimeout, + int fetchLimit, + boolean managed) + throws IllegalArgumentException; + + /** + * Instantiates a new UEB Topic Source + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * + * @return an UEB Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSource build(List<String> servers, + String topic, + String apiKey, + String apiSecret) + throws IllegalArgumentException; + + /** + * Instantiates a new UEB Topic Source + * + * @param uebTopicSourceType Implementation type + * @param servers list of servers + * @param topic topic name + * + * @return an UEB Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSource build(List<String> servers, + String topic) + throws IllegalArgumentException; + + /** + * Destroys an UEB Topic Source based on a topic + * + * @param topic topic name + * @throws IllegalArgumentException if invalid parameters are present + */ + public void destroy(String topic); + + /** + * Destroys all UEB Topic Sources + */ + public void destroy(); + + /** + * gets an UEB Topic Source based on topic name + * @param topic the topic name + * @return an UEB Topic Source with topic name + * @throws IllegalArgumentException if an invalid topic is provided + * @throws IllegalStateException if the UEB Topic Source is + * an incorrect state + */ + public UebTopicSource get(String topic) + throws IllegalArgumentException, IllegalStateException; + + /** + * Provides a snapshot of the UEB Topic Sources + * @return a list of the UEB Topic Sources + */ + public List<UebTopicSource> inventory(); +} + +/* ------------- implementation ----------------- */ + +/** + * Factory of UEB Source Topics indexed by topic name + */ +class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { + // get an instance of logger + private static Logger logger = FlexLogger.getLogger(IndexedUebTopicSourceFactory.class); + /** + * UEB Topic Name Index + */ + protected HashMap<String, UebTopicSource> uebTopicSources = + new HashMap<String, UebTopicSource>(); + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSource build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String consumerGroup, + String consumerInstance, + int fetchTimeout, + int fetchLimit, + boolean managed) + throws IllegalArgumentException { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized(this) { + if (uebTopicSources.containsKey(topic)) { + return uebTopicSources.get(topic); + } + + UebTopicSource uebTopicSource = + new SingleThreadedUebTopicSource(servers, topic, + apiKey, apiSecret, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit); + + if (managed) + uebTopicSources.put(topic, uebTopicSource); + + return uebTopicSource; + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<UebTopicSource> build(Properties properties) + throws IllegalArgumentException { + + String readTopics = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS); + if (readTopics == null || readTopics.isEmpty()) { + logger.warn("No topic for UEB Source " + properties); + return new ArrayList<UebTopicSource>(); + } + List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*"))); + + List<UebTopicSource> uebTopicSources = new ArrayList<UebTopicSource>(); + synchronized(this) { + for (String topic: readTopicList) { + + String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + + topic + + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + if (servers == null || servers.isEmpty()) { + logger.error("No UEB servers provided in " + properties); + continue; + } + + List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*"))); + + String apiKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + + String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + + String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); + + String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); + + String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); + int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH; + if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { + try { + fetchTimeout = Integer.parseInt(fetchTimeoutString); + } catch (NumberFormatException nfe) { + logger.warn("Fetch Timeout in invalid format for topic " + topic + ": " + fetchTimeoutString); + } + } + + String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + + PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); + int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH; + if (fetchLimitString != null && !fetchLimitString.isEmpty()) { + try { + fetchLimit = Integer.parseInt(fetchLimitString); + } catch (NumberFormatException nfe) { + logger.warn("Fetch Limit in invalid format for topic " + topic + ": " + fetchLimitString); + } + } + + String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + + topic + PolicyProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + UebTopicSource uebTopicSource = this.build(serverList, topic, + apiKey, apiSecret, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, managed); + uebTopicSources.add(uebTopicSource); + } + } + return uebTopicSources; + } + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSource build(List<String> servers, + String topic, + String apiKey, + String apiSecret) { + return this.build(servers, topic, + apiKey, apiSecret, + null, null, + UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH, + UebTopicSource.DEFAULT_LIMIT_FETCH, true); + } + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSource build(List<String> servers, String topic) { + return this.build(servers, topic, null, null); + } + + /** + * {@inheritDoc} + */ + @Override + public void destroy(String topic) + throws IllegalArgumentException { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + UebTopicSource uebTopicSource; + + synchronized(this) { + if (!uebTopicSources.containsKey(topic)) { + return; + } + + uebTopicSource = uebTopicSources.remove(topic); + } + + uebTopicSource.shutdown(); + } + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSource get(String topic) + throws IllegalArgumentException, IllegalStateException { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized(this) { + if (uebTopicSources.containsKey(topic)) { + return uebTopicSources.get(topic); + } else { + throw new IllegalStateException("UebTopiceSource for " + topic + " not found"); + } + } + } + + @Override + public synchronized List<UebTopicSource> inventory() { + List<UebTopicSource> readers = + new ArrayList<UebTopicSource>(this.uebTopicSources.values()); + return readers; + } + + @Override + public void destroy() { + List<UebTopicSource> readers = this.inventory(); + for (UebTopicSource reader: readers) { + reader.shutdown(); + } + + synchronized(this) { + this.uebTopicSources.clear(); + } + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java new file mode 100644 index 00000000..6fee5ce0 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -0,0 +1,204 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.List; +import java.util.Properties; + +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaConsumer; +import com.att.nsa.mr.client.impl.MRConsumerImpl; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; + +/** + * Wrapper around libraries to consume from message bus + * + */ +public interface BusConsumer { + + /** + * fetch messages + * + * @return list of messages + * @throws Exception when error encountered by underlying libraries + */ + public Iterable<String> fetch() throws Exception; + + /** + * close underlying library consumer + */ + public void close(); + + /** + * Cambria based consumer + */ + public static class CambriaConsumerWrapper implements BusConsumer { + /** + * Cambria client + */ + protected CambriaConsumer consumer; + + /** + * Cambria Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws GeneralSecurityException + * @throws MalformedURLException + */ + public CambriaConsumerWrapper(List<String> servers, String topic, + String apiKey, String apiSecret, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit) + throws IllegalArgumentException { + + ConsumerBuilder builder = + new CambriaClientBuilders.ConsumerBuilder(); + + builder.knownAs(consumerGroup, consumerInstance) + .usingHosts(servers) + .onTopic(topic) + .waitAtServer(fetchTimeout) + .receivingAtMost(fetchLimit); + + if (apiKey != null && !apiKey.isEmpty() && + apiSecret != null && !apiSecret.isEmpty()) { + builder.authenticatedBy(apiKey, apiSecret); + } + + try { + this.consumer = builder.build(); + } catch (MalformedURLException | GeneralSecurityException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * {@inheritDoc} + */ + public Iterable<String> fetch() throws Exception { + return this.consumer.fetch(); + } + + /** + * {@inheritDoc} + */ + public void close() { + this.consumer.close(); + } + + @Override + public String toString() { + return "CambriaConsumerWrapper []"; + } + } + + /** + * MR based consumer + */ + public static class DmaapConsumerWrapper implements BusConsumer { + + /** + * MR Consumer + */ + protected MRConsumerImpl consumer; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param aafLogin AAF Login + * @param aafPassword AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + */ + public DmaapConsumerWrapper(List<String> servers, String topic, + String apiKey, String apiSecret, + String aafLogin, String aafPassword, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit) + throws Exception { + + this.consumer = new MRConsumerImpl(servers, topic, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, + null, apiKey, apiSecret); + + this.consumer.setUsername(aafLogin); + this.consumer.setPassword(aafPassword); + + this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + + Properties props = new Properties(); + props.setProperty("Protocol", "http"); + this.consumer.setProps(props); + this.consumer.setHost(servers.get(0) + ":3904");; + } + + /** + * {@inheritDoc} + */ + public Iterable<String> fetch() throws Exception { + return this.consumer.fetch(); + } + + /** + * {@inheritDoc} + */ + public void close() { + this.consumer.close(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder. + append("DmaapConsumerWrapper ["). + append("consumer.getAuthDate()=").append(consumer.getAuthDate()). + append(", consumer.getAuthKey()=").append(consumer.getAuthKey()). + append(", consumer.getHost()=").append(consumer.getHost()). + append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()). + append(", consumer.getUsername()=").append(consumer.getUsername()). + append("]"); + return builder.toString(); + } + } + + +} + + + + diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java new file mode 100644 index 00000000..798bf989 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java @@ -0,0 +1,231 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.openecomp.policy.common.logging.eelf.PolicyLogger; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; +import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; +import com.fasterxml.jackson.annotation.JsonIgnore; + +public interface BusPublisher { + + /** + * sends a message + * + * @param partition id + * @param message the message + * @return true if success, false otherwise + * @throws IllegalArgumentException if no message provided + */ + public boolean send(String partitionId, String message) throws IllegalArgumentException; + + /** + * closes the publisher + */ + public void close(); + + /** + * Cambria based library publisher + */ + public static class CambriaPublisherWrapper implements BusPublisher { + + /** + * The actual Cambria publisher + */ + @JsonIgnore + protected volatile CambriaBatchingPublisher publisher; + + public CambriaPublisherWrapper(List<String> servers, String topic, + String apiKey, + String apiSecret) + throws IllegalArgumentException { + PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); + + builder.usingHosts(servers) + .onTopic(topic); + + // Only supported in 0.2.4 version + // .logSendFailuresAfter(DEFAULT_LOG_SEND_FAILURES_AFTER); + + if (apiKey != null && !apiKey.isEmpty() && + apiSecret != null && !apiSecret.isEmpty()) { + builder.authenticatedBy(apiKey, apiSecret); + } + + try { + this.publisher = builder.build(); + } catch (MalformedURLException | GeneralSecurityException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean send(String partitionId, String message) + throws IllegalArgumentException { + if (message == null) + throw new IllegalArgumentException("No message provided"); + + try { + this.publisher.send(partitionId, message); + } catch (Exception e) { + PolicyLogger.warn(CambriaPublisherWrapper.class.getName(), + "SEND of " + message + " IN " + + this + " cannot be performed because of " + + e.getMessage()); + return false; + } + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + if (PolicyLogger.isInfoEnabled()) + PolicyLogger.info(CambriaPublisherWrapper.class.getName(), + "CREATION: " + this); + + try { + this.publisher.close(); + } catch (Exception e) { + PolicyLogger.warn(CambriaPublisherWrapper.class.getName(), + "CLOSE on " + this + " FAILED because of " + + e.getMessage()); + } + } + + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("CambriaPublisherWrapper ["). + append("publisher.getPendingMessageCount()="). + append(publisher.getPendingMessageCount()). + append("]"); + return builder.toString(); + } + + } + + /** + * DmaapClient library wrapper + */ + public static class DmaapPublisherWrapper implements BusPublisher { + /** + * MR based Publisher + */ + protected MRSimplerBatchPublisher publisher; + + public DmaapPublisherWrapper(List<String> servers, String topic, + String aafLogin, + String aafPassword) { + + ArrayList<String> dmaapServers = new ArrayList<String>(); + for (String server: servers) { + dmaapServers.add(server + ":3904"); + } + + this.publisher = + new MRSimplerBatchPublisher.Builder(). + againstUrls(dmaapServers). + onTopic(topic). + build(); + + this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + + this.publisher.setUsername(aafLogin); + this.publisher.setPassword(aafPassword); + + Properties props = new Properties(); + props.setProperty("Protocol", "http"); + props.setProperty("contenttype", "application/json"); + + this.publisher.setProps(props); + + this.publisher.setHost(servers.get(0)); + + if (PolicyLogger.isInfoEnabled()) + PolicyLogger.info(DmaapPublisherWrapper.class.getName(), + "CREATION: " + this); + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + if (PolicyLogger.isInfoEnabled()) + PolicyLogger.info(DmaapPublisherWrapper.class.getName(), + "CREATION: " + this); + + try { + this.publisher.close(1, TimeUnit.SECONDS); + } catch (Exception e) { + PolicyLogger.warn(DmaapPublisherWrapper.class.getName(), + "CLOSE: " + this + " because of " + + e.getMessage()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean send(String partitionId, String message) + throws IllegalArgumentException { + if (message == null) + throw new IllegalArgumentException("No message provided"); + + this.publisher.send(partitionId, message); + return true; + + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("DmaapPublisherWrapper ["). + append("publisher.getAuthDate()=").append(publisher.getAuthDate()). + append(", publisher.getAuthKey()=").append(publisher.getAuthKey()). + append(", publisher.getHost()=").append(publisher.getHost()). + append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()). + append(", publisher.getUsername()=").append(publisher.getUsername()). + append(", publisher.getPendingMessageCount()=").append(publisher.getPendingMessageCount()). + append("]"); + return builder.toString(); + } + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java new file mode 100644 index 00000000..e36e3afc --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java @@ -0,0 +1,112 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.util.List; + +import org.apache.commons.collections4.queue.CircularFifoQueue; + +import org.openecomp.policy.drools.event.comm.Topic; +import org.openecomp.policy.drools.event.comm.bus.BusTopic; + +public abstract class BusTopicBase implements BusTopic, Topic { + + protected List<String> servers; + + protected String topic; + + protected String apiKey; + protected String apiSecret; + + protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<String>(10); + + public BusTopicBase(List<String> servers, + String topic, + String apiKey, + String apiSecret) + throws IllegalArgumentException { + + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("UEB Server(s) must be provided"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("An UEB Topic must be provided"); + } + + this.servers = servers; + this.topic = topic; + + this.apiKey = apiKey; + this.apiSecret = apiSecret; + } + + /** + * {@inheritDoc} + */ + @Override + public String getTopic() { + return topic; + } + + /** + * {@inheritDoc} + */ + @Override + public List<String> getServers() { + return servers; + } + + /** + * {@inheritDoc} + */ + @Override + public String getApiKey() { + return apiKey; + } + + /** + * {@inheritDoc} + */ + @Override + public String getApiSecret() { + return apiSecret; + } + + /** + * @return the recentEvents + */ + @Override + public synchronized String[] getRecentEvents() { + String[] events = new String[recentEvents.size()]; + return recentEvents.toArray(events); + } + + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("UebTopicBase [servers=").append(servers).append(", topic=").append(topic).append(", apiKey=") + .append(apiKey).append(", apiSecret=").append(apiSecret).append("]"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java new file mode 100644 index 00000000..bd88818b --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java @@ -0,0 +1,284 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.util.List; +import java.util.UUID; + +import org.apache.log4j.Logger; + +import org.openecomp.policy.drools.event.comm.bus.BusTopicSink; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.eelf.MessageCodes; + +/** + * Transport Agnostic Bus Topic Sink to carry out the core functionality + * to interact with a sink regardless if it is UEB or DMaaP. + * + */ +public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink { + + /** + * logger + */ + private static org.openecomp.policy.common.logging.flexlogger.Logger logger = + FlexLogger.getLogger(InlineBusTopicSink.class); + + /** + * Not to be converted to PolicyLogger. + * This will contain all in/out traffic and only that in a single file in a concise format. + */ + protected static final Logger networkLogger = Logger.getLogger(NETWORK_LOGGER); + + /** + * The partition key to publish to + */ + protected String partitionId; + + /** + * Am I running? + * reflects invocation of start()/stop() + * !locked & start() => alive + * stop() => !alive + */ + protected volatile boolean alive = false; + + /** + * Am I locked? + * reflects invocation of lock()/unlock() operations + * locked => !alive (but not in the other direction necessarily) + * locked => !offer, !run, !start, !stop (but this last one is obvious + * since locked => !alive) + */ + protected volatile boolean locked = false; + + /** + * message bus publisher + */ + protected BusPublisher publisher; + + /** + * constructor for abstract sink + * + * @param servers servers + * @param topic topic + * @param apiKey api secret + * @param apiSecret api secret + * @param partitionId partition id + * @throws IllegalArgumentException in invalid parameters are passed in + */ + public InlineBusTopicSink(List<String> servers, String topic, + String apiKey, String apiSecret, String partitionId) + throws IllegalArgumentException { + + super(servers, topic, apiKey, apiSecret); + + if (partitionId == null || partitionId.isEmpty()) { + this.partitionId = UUID.randomUUID ().toString(); + } + } + + /** + * Initialize the Bus publisher + */ + public abstract void init(); + + /** + * {@inheritDoc} + */ + @Override + public boolean start() throws IllegalStateException { + + if (logger.isInfoEnabled()) + logger.info("START: " + this); + + synchronized(this) { + + if (this.alive) + return true; + + if (locked) + throw new IllegalStateException(this + " is locked."); + + this.alive = true; + } + + this.init(); + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean stop() { + + BusPublisher publisherCopy; + synchronized(this) { + this.alive = false; + publisherCopy = this.publisher; + this.publisher = null; + } + + if (publisherCopy != null) { + try { + publisherCopy.close(); + } catch (Exception e) { + logger.warn(MessageCodes.EXCEPTION_ERROR, e, "PUBLISHER.CLOSE", this.toString()); + e.printStackTrace(); + } + } else { + logger.warn("No publisher to close: " + this); + return false; + } + + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean lock() { + + if (logger.isInfoEnabled()) + logger.info("LOCK: " + this); + + synchronized (this) { + if (this.locked) + return true; + + this.locked = true; + } + + return this.stop(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean unlock() { + + if (logger.isInfoEnabled()) + logger.info("UNLOCK: " + this); + + synchronized(this) { + if (!this.locked) + return true; + + this.locked = false; + } + + try { + return this.start(); + } catch (Exception e) { + logger.warn("can't start after unlocking " + this + + " because of " + e.getMessage()); + e.printStackTrace(); + return false; + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isLocked() { + return this.locked; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isAlive() { + return this.alive; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean send(String message) throws IllegalArgumentException, IllegalStateException { + + if (message == null || message.isEmpty()) { + throw new IllegalArgumentException("Message to send is empty"); + } + + if (!this.alive) { + throw new IllegalStateException(this + " is stopped"); + } + + try { + synchronized (this) { + this.recentEvents.add(message); + } + + if (networkLogger.isInfoEnabled()) { + networkLogger.info("[OUT|" + this.getTopicCommInfrastructure() + "|" + + this.topic + "]:" + + message); + } + + publisher.send(this.partitionId, message); + } catch (Exception e) { + logger.error("can't start after unlocking " + this + + " because of " + e.getMessage()); + e.printStackTrace(); + return false; + } + + return true; + } + + + /** + * {@inheritDoc} + */ + @Override + public void setPartitionKey(String partitionKey) { + this.partitionId = partitionKey; + } + + /** + * {@inheritDoc} + */ + @Override + public String getPartitionKey() { + return this.partitionId; + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() throws IllegalStateException { + this.stop(); + } + + /** + * {@inheritDoc} + */ + @Override + public abstract CommInfrastructure getTopicCommInfrastructure(); + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java new file mode 100644 index 00000000..417c6d47 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -0,0 +1,84 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.util.List; + +import org.openecomp.policy.drools.event.comm.Topic; +import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.flexlogger.Logger; + +/** + * This implementation publishes events for the associated DMAAP topic, + * inline with the calling thread. + */ +public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink { + + protected static Logger logger = + FlexLogger.getLogger(InlineDmaapTopicSink.class); + + protected final String userName; + protected final String password; + + public InlineDmaapTopicSink(List<String> servers, String topic, + String apiKey, String apiSecret, + String userName, String password, + String partitionKey) + throws IllegalArgumentException { + + super(servers, topic, apiKey, apiSecret, partitionKey); + + this.userName = userName; + this.password = password; + } + + + @Override + public void init() { + this.publisher = + new BusPublisher.DmaapPublisherWrapper(this.servers, + this.topic, + this.userName, + this.password); + if (logger.isInfoEnabled()) + logger.info("DMAAP SINK TOPIC created " + this); + } + + /** + * {@inheritDoc} + */ + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.DMAAP; + } + + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("InlineDmaapTopicSink [userName=").append(userName).append(", password=").append(password) + .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=") + .append(super.toString()).append("]"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java new file mode 100644 index 00000000..2d4b1552 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java @@ -0,0 +1,91 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.util.List; + +import org.openecomp.policy.drools.event.comm.Topic; +import org.openecomp.policy.drools.event.comm.bus.UebTopicSink; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; + +/** + * This implementation publishes events for the associated UEB topic, + * inline with the calling thread. + */ +public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink { + + /** + * logger + */ + private static org.openecomp.policy.common.logging.flexlogger.Logger logger = + FlexLogger.getLogger(InlineUebTopicSink.class); + + /** + * Argument-based UEB Topic Writer instantiation + * + * @param servers list of UEB servers available for publishing + * @param topic the topic to publish to + * @param apiKey the api key (optional) + * @param apiSecret the api secret (optional) + * @param partitionId the partition key (optional, autogenerated if not provided) + * + * @throws IllegalArgumentException if invalid arguments are detected + */ + public InlineUebTopicSink(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String partitionId) + throws IllegalArgumentException { + super(servers, topic, apiKey, apiSecret, partitionId); + } + + /** + * Instantiation of internal resources + */ + @Override + public void init() { + + this.publisher = + new BusPublisher.CambriaPublisherWrapper(this.servers, + this.topic, + this.apiKey, + this.apiSecret); + if (logger.isInfoEnabled()) + logger.info("UEB SINK TOPIC created " + this); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("InlineUebTopicSink [getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()) + .append(", toString()=").append(super.toString()).append("]"); + return builder.toString(); + } + + /** + * {@inheritDoc} + */ + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.UEB; + } +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java new file mode 100644 index 00000000..f37c349e --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -0,0 +1,477 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.log4j.Logger; + +import org.openecomp.policy.drools.event.comm.TopicListener; +import org.openecomp.policy.drools.event.comm.bus.BusTopicSource; +import org.openecomp.policy.common.logging.eelf.MessageCodes; +import org.openecomp.policy.common.logging.eelf.PolicyLogger; + +/** + * This topic source implementation specializes in reading messages + * over a bus topic source and notifying its listeners + */ +public abstract class SingleThreadedBusTopicSource + extends BusTopicBase + implements Runnable, BusTopicSource { + + private String className = SingleThreadedBusTopicSource.class.getName(); + /** + * Not to be converted to PolicyLogger. + * This will contain all instract /out traffic and only that in a single file in a concise format. + */ + protected static final Logger networkLogger = Logger.getLogger(NETWORK_LOGGER); + + /** + * Bus consumer group + */ + protected final String consumerGroup; + + /** + * Bus consumer instance + */ + protected final String consumerInstance; + + /** + * Bus fetch timeout + */ + protected final int fetchTimeout; + + /** + * Bus fetch limit + */ + protected final int fetchLimit; + + /** + * Message Bus Consumer + */ + protected BusConsumer consumer; + + /** + * Am I running? + * reflects invocation of start()/stop() + * !locked & start() => alive + * stop() => !alive + */ + protected volatile boolean alive = false; + + /** + * Am I locked? + * reflects invocation of lock()/unlock() operations + * locked => !alive (but not in the other direction necessarily) + * locked => !offer, !run, !start, !stop (but this last one is obvious + * since locked => !alive) + */ + protected volatile boolean locked = false; + + /** + * Independent thread reading message over my topic + */ + protected Thread busPollerThread; + + /** + * All my subscribers for new message notifications + */ + protected final ArrayList<TopicListener> topicListeners = new ArrayList<TopicListener>(); + + /** + * + * @param servers Bus servers + * @param topic Bus Topic to be monitored + * @param apiKey Bus API Key (optional) + * @param apiSecret Bus API Secret (optional) + * @param consumerGroup Bus Reader Consumer Group + * @param consumerInstance Bus Reader Instance + * @param fetchTimeout Bus fetch timeout + * @param fetchLimit Bus fetch limit + * @throws IllegalArgumentException An invalid parameter passed in + */ + public SingleThreadedBusTopicSource(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String consumerGroup, + String consumerInstance, + int fetchTimeout, + int fetchLimit) + throws IllegalArgumentException { + + super(servers, topic, apiKey, apiSecret); + + if (consumerGroup == null || consumerGroup.isEmpty()) { + this.consumerGroup = UUID.randomUUID ().toString(); + } else { + this.consumerGroup = consumerGroup; + } + + if (consumerInstance == null || consumerInstance.isEmpty()) { + this.consumerInstance = DEFAULT_CONSUMER_INSTANCE; + } else { + this.consumerInstance = consumerInstance; + } + + if (fetchTimeout <= 0) { + this.fetchTimeout = NO_TIMEOUT_MS_FETCH; + } else { + this.fetchTimeout = fetchTimeout; + } + + if (fetchLimit <= 0) { + this.fetchLimit = NO_LIMIT_FETCH; + } else { + this.fetchLimit = fetchLimit; + } + } + + /** + * Initialize the Bus client + */ + public abstract void init() throws Exception; + + /** + * {@inheritDoc} + */ + @Override + public void register(TopicListener topicListener) + throws IllegalArgumentException { + + PolicyLogger.info(className,"REGISTER: " + topicListener + " INTO " + this); + + synchronized(this) { + if (topicListener == null) + throw new IllegalArgumentException("TopicListener must be provided"); + + /* check that this listener is not registered already */ + for (TopicListener listener: this.topicListeners) { + if (listener == topicListener) { + // already registered + return; + } + } + + this.topicListeners.add(topicListener); + } + + try { + this.start(); + } catch (Exception e) { + PolicyLogger.info(className, "new registration of " + topicListener + + ",but can't start source because of " + e.getMessage()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void unregister(TopicListener topicListener) { + + PolicyLogger.info(className, "UNREGISTER: " + topicListener + " FROM " + this); + + boolean stop = false; + synchronized (this) { + if (topicListener == null) + throw new IllegalArgumentException("TopicListener must be provided"); + + this.topicListeners.remove(topicListener); + stop = (this.topicListeners.isEmpty()); + } + + if (stop) { + this.stop(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean lock() { + PolicyLogger.info(className, "LOCK: " + this); + + synchronized (this) { + if (this.locked) + return true; + + this.locked = true; + } + + return this.stop(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean unlock() { + PolicyLogger.info(className, "UNLOCK: " + this); + + synchronized(this) { + if (!this.locked) + return true; + + this.locked = false; + } + + try { + return this.start(); + } catch (Exception e) { + PolicyLogger.warn("can't start after unlocking " + this + + " because of " + e.getMessage()); + return false; + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean start() throws IllegalStateException { + + PolicyLogger.info(className, "START: " + this); + + synchronized(this) { + + if (alive) { + return true; + } + + if (locked) { + throw new IllegalStateException(this + " is locked."); + } + + if (this.busPollerThread == null || + !this.busPollerThread.isAlive() || + this.consumer == null) { + + try { + this.init(); + this.alive = true; + this.busPollerThread = new Thread(this); + this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic()); + busPollerThread.start(); + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalStateException(e); + } + } + } + + return this.alive; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean stop() { + PolicyLogger.info(className, "STOP: " + this); + + synchronized(this) { + BusConsumer consumerCopy = this.consumer; + + this.alive = false; + this.consumer = null; + + if (consumerCopy != null) { + try { + consumerCopy.close(); + } catch (Exception e) { + PolicyLogger.warn(MessageCodes.EXCEPTION_ERROR, e, "CONSUMER.CLOSE", this.toString()); + } + } + } + + Thread.yield(); + + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isLocked() { + return this.locked; + } + + /** + * broadcast event to all listeners + * + * @param message the event + * @return true if all notifications are performed with no error, false otherwise + */ + protected boolean broadcast(String message) { + + /* take a snapshot of listeners */ + List<TopicListener> snapshotListeners = this.snapshotTopicListeners(); + + boolean success = true; + for (TopicListener topicListener: snapshotListeners) { + try { + topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message); + } catch (Exception e) { + PolicyLogger.warn(this.className, "ERROR notifying " + topicListener.toString() + + " because of " + e.getMessage() + " @ " + this.toString()); + success = false; + } + } + return success; + } + + /** + * take a snapshot of current topic listeners + * + * @return the topic listeners + */ + protected synchronized List<TopicListener> snapshotTopicListeners() { + @SuppressWarnings("unchecked") + List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone(); + return listeners; + } + + /** + * Run thread method for the Bus Reader + */ + @Override + public void run() { + while (this.alive) { + try { + for (String event: this.consumer.fetch()) { + synchronized (this) { + this.recentEvents.add(event); + } + + if (networkLogger.isInfoEnabled()) { + networkLogger.info("IN[" + this.getTopicCommInfrastructure() + "|" + + this.topic + "]:" + + event); + } + + PolicyLogger.info(className, this.topic + " <-- " + event); + broadcast(event); + + if (!this.alive) + break; + } + } catch (Exception e) { + PolicyLogger.error( MessageCodes.EXCEPTION_ERROR, className, e, "CONSUMER.FETCH", this.toString()); + } + } + + PolicyLogger.warn(this.className, "Exiting: " + this); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean offer(String event) { + PolicyLogger.info(className, "OFFER: " + event + " TO " + this); + + if (!this.alive) { + throw new IllegalStateException(this + " is not alive."); + } + + synchronized (this) { + this.recentEvents.add(event); + } + + if (networkLogger.isInfoEnabled()) { + networkLogger.info("IN[" + this.getTopicCommInfrastructure() + "|" + + this.topic + "]:" + + event); + } + + + return broadcast(event); + } + + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup) + .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout) + .append(", fetchLimit=").append(fetchLimit) + .append(", consumer=").append(this.consumer).append(", alive=") + .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread) + .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString()) + .append("]"); + return builder.toString(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isAlive() { + return alive; + } + + /** + * {@inheritDoc} + */ + @Override + public String getConsumerGroup() { + return consumerGroup; + } + + /** + * {@inheritDoc} + */ + @Override + public String getConsumerInstance() { + return consumerInstance; + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() throws IllegalStateException { + this.stop(); + this.topicListeners.clear(); + } + + /** + * {@inheritDoc} + */ + @Override + public int getFetchTimeout() { + return fetchTimeout; + } + + /** + * {@inheritDoc} + */ + @Override + public int getFetchLimit() { + return fetchLimit; + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java new file mode 100644 index 00000000..e65d44a7 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -0,0 +1,120 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.util.List; + +import org.openecomp.policy.drools.event.comm.Topic; +import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource; +import org.openecomp.policy.common.logging.eelf.PolicyLogger; + +/** + * This topic reader implementation specializes in reading messages + * over DMAAP topic and notifying its listeners + */ +public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource + implements DmaapTopicSource, Runnable { + + protected final String userName; + protected final String password; + private String className = SingleThreadedDmaapTopicSource.class.getName(); + + /** + * + * @param servers DMaaP servers + * @param topic DMaaP Topic to be monitored + * @param apiKey DMaaP API Key (optional) + * @param apiSecret DMaaP API Secret (optional) + * @param consumerGroup DMaaP Reader Consumer Group + * @param consumerInstance DMaaP Reader Instance + * @param fetchTimeout DMaaP fetch timeout + * @param fetchLimit DMaaP fetch limit + * @throws IllegalArgumentException An invalid parameter passed in + */ + public SingleThreadedDmaapTopicSource(List<String> servers, String topic, + String apiKey, String apiSecret, + String userName, String password, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit) + throws IllegalArgumentException { + + + super(servers, topic, apiKey, apiSecret, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit); + + this.userName = userName; + this.password = password; + + try { + this.init(); + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalArgumentException(e); + } + } + + + /** + * Initialize the Cambria or MR Client + */ + @Override + public void init() throws Exception { + + if (this.userName == null || this.userName.isEmpty() || + this.password == null || this.password.isEmpty()) { + this.consumer = + new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, + this.apiKey, this.apiSecret, + this.consumerGroup, this.consumerInstance, + this.fetchTimeout, this.fetchLimit); + } else { + this.consumer = + new BusConsumer.DmaapConsumerWrapper(this.servers, this.topic, + this.apiKey, this.apiSecret, + this.userName, this.password, + this.consumerGroup, this.consumerInstance, + this.fetchTimeout, this.fetchLimit); + } + + PolicyLogger.info(className, "CREATION: " + this); + } + + /** + * {@inheritDoc} + */ + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.DMAAP; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=") + .append((password == null || password.isEmpty()) ? "-" : password.length()) + .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()) + .append(", toString()=").append(super.toString()).append("]"); + return builder.toString(); + } + + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java new file mode 100644 index 00000000..edb55c75 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java @@ -0,0 +1,89 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.event.comm.bus.internal; + +import java.util.List; + +import org.openecomp.policy.drools.event.comm.Topic; +import org.openecomp.policy.drools.event.comm.bus.UebTopicSource; + +/** + * This topic source implementation specializes in reading messages + * over an UEB Bus topic source and notifying its listeners + */ +public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource + implements UebTopicSource { + + /** + * + * @param servers UEB servers + * @param topic UEB Topic to be monitored + * @param apiKey UEB API Key (optional) + * @param apiSecret UEB API Secret (optional) + * @param consumerGroup UEB Reader Consumer Group + * @param consumerInstance UEB Reader Instance + * @param fetchTimeout UEB fetch timeout + * @param fetchLimit UEB fetch limit + * @throws IllegalArgumentException An invalid parameter passed in + */ + public SingleThreadedUebTopicSource(List<String> servers, String topic, + String apiKey, String apiSecret, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit) + throws IllegalArgumentException { + + super(servers, topic, apiKey, apiSecret, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit); + + + this.init(); + } + + /** + * Initialize the Cambria client + */ + @Override + public void init() { + this.consumer = + new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, + this.apiKey, this.apiSecret, + this.consumerGroup, this.consumerInstance, + this.fetchTimeout, this.fetchLimit); + } + + /** + * {@inheritDoc} + */ + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.UEB; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("SingleThreadedUebTopicSource [getTopicCommInfrastructure()=") + .append(getTopicCommInfrastructure()).append(", toString()=").append(super.toString()).append("]"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/HttpClient.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/HttpClient.java new file mode 100644 index 00000000..2e81b2c8 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/HttpClient.java @@ -0,0 +1,48 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.policy.drools.http.client; + +import javax.ws.rs.core.Response; + +import org.openecomp.policy.drools.properties.Startable; + +public interface HttpClient extends Startable { + + public Response get(String path); + + public Response get(); + + public static <T> T getBody(Response response, Class<T> entityType) { + return response.readEntity(entityType); + } + + public String getName(); + public boolean isHttps(); + public boolean isSelfSignedCerts(); + public String getHostname(); + public int getPort(); + public String getBasePath(); + public String getUserName(); + public String getPassword(); + public String getBaseUrl(); + + + public static final HttpClientFactory factory = new IndexedHttpClientFactory(); +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/HttpClientFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/HttpClientFactory.java new file mode 100644 index 00000000..53a8c2b2 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/HttpClientFactory.java @@ -0,0 +1,185 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.policy.drools.http.client; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.openecomp.policy.drools.http.client.internal.JerseyClient; +import org.openecomp.policy.drools.properties.PolicyProperties; + +public interface HttpClientFactory { + + public HttpClient build(String name, boolean https, + boolean selfSignedCerts, + String hostname, int port, + String baseUrl, String userName, + String password, boolean managed) + throws Exception; + + public ArrayList<HttpClient> build(Properties properties) throws Exception; + + public HttpClient get(String name); + + public List<HttpClient> inventory(); + + public void destroy(String name); + + public void destroy(); +} + +class IndexedHttpClientFactory implements HttpClientFactory { + + protected HashMap<String, HttpClient> clients = new HashMap<String, HttpClient>(); + + @Override + public synchronized HttpClient build(String name, boolean https, boolean selfSignedCerts, + String hostname, int port, + String baseUrl, String userName, String password, + boolean managed) + throws Exception { + if (clients.containsKey(name)) + return clients.get(name); + + JerseyClient client = + new JerseyClient(name, https, selfSignedCerts, hostname, port, baseUrl, userName, password); + + if (managed) + clients.put(name, client); + + return client; + } + + @Override + public synchronized ArrayList<HttpClient> build(Properties properties) throws Exception { + ArrayList<HttpClient> clientList = new ArrayList<HttpClient>(); + + String clientNames = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES); + if (clientNames == null || clientNames.isEmpty()) { + return clientList; + } + + List<String> clientNameList = + new ArrayList<String>(Arrays.asList(clientNames.split("\\s*,\\s*"))); + + for (String clientName : clientNameList) { + String httpsString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + boolean https = false; + if (httpsString != null && !httpsString.isEmpty()) { + https = Boolean.parseBoolean(httpsString); + } + + String hostName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX); + + String servicePortString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX); + int port; + try { + if (servicePortString == null || servicePortString.isEmpty()) { + continue; + } + port = Integer.parseInt(servicePortString); + } catch (NumberFormatException nfe) { + nfe.printStackTrace(); + continue; + } + + String baseUrl = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX); + + String userName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX); + + String password = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX); + + String managedString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + + PolicyProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + try { + HttpClient client = + this.build(clientName, https, https, hostName, port, baseUrl, + userName, password, managed); + clientList.add(client); + } catch (Exception e) { + e.printStackTrace(); + } + } + + return clientList; + } + + @Override + public synchronized HttpClient get(String name) { + if (clients.containsKey(name)) { + return clients.get(name); + } + + throw new IllegalArgumentException("Http Client " + name + " not found"); + } + + @Override + public synchronized List<HttpClient> inventory() { + return new ArrayList<HttpClient>(this.clients.values()); + } + + @Override + public synchronized void destroy(String name) { + if (!clients.containsKey(name)) { + return; + } + + HttpClient client = clients.remove(name); + try { + client.shutdown(); + } catch (IllegalStateException e) { + e.printStackTrace(); + } + } + + @Override + public void destroy() { + List<HttpClient> clientsInventory = this.inventory(); + for (HttpClient client: clientsInventory) { + client.shutdown(); + } + + synchronized(this) { + this.clients.clear(); + } + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/internal/JerseyClient.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/internal/JerseyClient.java new file mode 100644 index 00000000..4fa59dc8 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/internal/JerseyClient.java @@ -0,0 +1,242 @@ +/*- + * ============LICENSE_START======================================================= + * policy-healthcheck + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.drools.http.client.internal; + +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.Response; + +import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; +import org.openecomp.policy.drools.http.client.HttpClient; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +public class JerseyClient implements HttpClient { + + protected final String name; + protected final boolean https; + protected final boolean selfSignedCerts; + protected final String hostname; + protected final int port; + protected final String basePath; + protected final String userName; + protected final String password; + + protected final Client client; + protected final String baseUrl; + + protected boolean alive = true; + + + public JerseyClient(String name, boolean https, + boolean selfSignedCerts, + String hostname, int port, + String basePath, String userName, + String password) + throws Exception { + + super(); + + if (name == null || name.isEmpty()) + throw new IllegalArgumentException("Name must be provided"); + + if (hostname == null || hostname.isEmpty()) + throw new IllegalArgumentException("Hostname must be provided"); + + if (port <= 0 && port >= 65535) + throw new IllegalArgumentException("Invalid Port provided: " + port); + + this.name = name; + this.https = https; + this.hostname = hostname; + this.port = port; + this.basePath = basePath; + this.userName = userName; + this.password = password; + this.selfSignedCerts = selfSignedCerts; + + StringBuffer tmpBaseUrl = new StringBuffer(); + if (this.https) { + tmpBaseUrl.append("https://"); + ClientBuilder clientBuilder; + SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); + if (this.selfSignedCerts) { + sslContext.init(null, new TrustManager[]{new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {} + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {} + @Override + public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } + + }}, new SecureRandom()); + clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext).hostnameVerifier(new HostnameVerifier() { + @Override + public boolean verify(String hostname, SSLSession session) {return true;} + }); + } else { + sslContext.init(null, null, null); + clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext); + } + this.client = clientBuilder.build(); + } else { + tmpBaseUrl.append("http://"); + this.client = ClientBuilder.newClient(); + } + + if (this.userName != null && !this.userName.isEmpty() && + this.password != null && !this.password.isEmpty()) { + HttpAuthenticationFeature authFeature = HttpAuthenticationFeature.basic(userName, password); + this.client.register(authFeature); + } + + this.baseUrl = tmpBaseUrl.append(this.hostname).append(":"). + append(this.port).append("/"). + append((this.basePath == null) ? "" : this.basePath). + toString(); + } + + @Override + public Response get(String path) { + if (path != null && !path.isEmpty()) + return this.client.target(this.baseUrl).path(path).request().get(); + else + return this.client.target(this.baseUrl).request().get(); + } + + @Override + public Response get() { + return this.client.target(this.baseUrl).request().get(); + } + + + @Override + public boolean start() throws IllegalStateException { + return alive; + } + + @Override + public boolean stop() throws IllegalStateException { + return !alive; + } + + @Override + public void shutdown() throws IllegalStateException { + synchronized(this) { + alive = false; + } + + try { + this.client.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public synchronized boolean isAlive() { + return this.alive; + } + + @Override + public String getName() { + return name; + } + + @Override + public boolean isHttps() { + return https; + } + + @Override + public boolean isSelfSignedCerts() { + return selfSignedCerts; + } + + @Override + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + @Override + public String getBasePath() { + return basePath; + } + + @Override + public String getUserName() { + return userName; + } + + @JsonIgnore + @Override + public String getPassword() { + return password; + } + + @Override + public String getBaseUrl() { + return baseUrl; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("JerseyClient [name="); + builder.append(name); + builder.append(", https="); + builder.append(https); + builder.append(", selfSignedCerts="); + builder.append(selfSignedCerts); + builder.append(", hostname="); + builder.append(hostname); + builder.append(", port="); + builder.append(port); + builder.append(", basePath="); + builder.append(basePath); + builder.append(", userName="); + builder.append(userName); + builder.append(", password="); + builder.append(password); + builder.append(", client="); + builder.append(client); + builder.append(", baseUrl="); + builder.append(baseUrl); + builder.append(", alive="); + builder.append(alive); + builder.append("]"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/HttpServletServer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/HttpServletServer.java new file mode 100644 index 00000000..5f5dd787 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/HttpServletServer.java @@ -0,0 +1,81 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.policy.drools.http.server; + +import org.openecomp.policy.drools.properties.Startable; + +/** + * A Jetty Server to server REST Requests + */ +public interface HttpServletServer extends Startable { + + /** + * + * @return port + */ + public int getPort(); + + /** + * enables basic authentication with user and password on the the relative path relativeUriPath + * + * @param user + * @param password + * @param relativeUriPath + */ + public void setBasicAuthentication(String user, String password, String relativeUriPath); + + /** + * adds a JAX-RS servlet class to serve REST requests + * + * @param servletPath + * @param restClass + * @throws IllegalArgumentException + * @throws IllegalStateException + */ + public void addServletClass(String servletPath, String restClass) + throws IllegalArgumentException, IllegalStateException; + + /** + * adds a package containing JAX-RS classes to serve REST requests + * + * @param servletPath + * @param restPackage + * @throws IllegalArgumentException + * @throws IllegalStateException + */ + public void addServletPackage(String servletPath, String restPackage) + throws IllegalArgumentException, IllegalStateException; + + /** + * blocking start of the http server + * + * @param maxWaitTime max time to wait for the start to take place + * @return true if start was successful + * + * @throws IllegalArgumentException if arguments are invalid + */ + public boolean waitedStart(long maxWaitTime) throws IllegalArgumentException; + + + /** + * factory for managing and tracking DMAAP sources + */ + public static HttpServletServerFactory factory = new IndexedHttpServletServerFactory(); +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/HttpServletServerFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/HttpServletServerFactory.java new file mode 100644 index 00000000..bd5ae242 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/HttpServletServerFactory.java @@ -0,0 +1,206 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.policy.drools.http.server; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.flexlogger.Logger; +import org.openecomp.policy.drools.http.server.internal.JettyJerseyServer; +import org.openecomp.policy.drools.properties.PolicyProperties; + +/** + * Jetty Server Factory + */ +public interface HttpServletServerFactory { + + public HttpServletServer build(String name, String host, int port, String contextPath, boolean managed) + throws IllegalArgumentException; + + public ArrayList<HttpServletServer> build(Properties properties) throws IllegalArgumentException; + + public HttpServletServer get(int port); + public List<HttpServletServer> inventory(); + public void destroy(int port); + public void destroy(); +} + +class IndexedHttpServletServerFactory implements HttpServletServerFactory { + + protected static Logger logger = FlexLogger.getLogger(IndexedHttpServletServerFactory.class); + + protected HashMap<Integer, JettyJerseyServer> servers = new HashMap<Integer, JettyJerseyServer>(); + + @Override + public synchronized HttpServletServer build(String name, String host, int port, + String contextPath, boolean managed) + throws IllegalArgumentException { + + if (servers.containsKey(port)) + return servers.get(port); + + JettyJerseyServer server = new JettyJerseyServer(name, host, port, contextPath); + if (managed) + servers.put(port, server); + + return server; + } + + @Override + public synchronized ArrayList<HttpServletServer> build(Properties properties) + throws IllegalArgumentException { + + ArrayList<HttpServletServer> serviceList = new ArrayList<HttpServletServer>(); + + String serviceNames = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES); + if (serviceNames == null || serviceNames.isEmpty()) { + logger.warn("No topic for HTTP Service " + properties); + return serviceList; + } + + List<String> serviceNameList = + new ArrayList<String>(Arrays.asList(serviceNames.split("\\s*,\\s*"))); + + for (String serviceName : serviceNameList) { + String servicePortString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX); + + int servicePort; + try { + if (servicePortString == null || servicePortString.isEmpty()) { + if (logger.isWarnEnabled()) + logger.warn("No HTTP port for service in " + serviceName); + continue; + } + servicePort = Integer.parseInt(servicePortString); + } catch (NumberFormatException nfe) { + if (logger.isWarnEnabled()) + logger.warn("No HTTP port for service in " + serviceName); + continue; + } + + String hostName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX); + + String contextUriPath = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + + PolicyProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX); + + String userName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX); + + String password = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX); + + String authUriPath = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + + PolicyProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX); + + String restClasses = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX); + + String restPackages = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + + PolicyProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX); + String restUriPath = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + + PolicyProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX); + + String managedString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + + PolicyProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + HttpServletServer service = build(serviceName, hostName, servicePort, contextUriPath, managed); + if (userName != null && !userName.isEmpty() && password != null && !password.isEmpty()) { + service.setBasicAuthentication(userName, password, authUriPath); + } + + if (restClasses != null && !restClasses.isEmpty()) { + List<String> restClassesList = + new ArrayList<String>(Arrays.asList(restClasses.split("\\s*,\\s*"))); + for (String restClass : restClassesList) + service.addServletClass(restUriPath, restClass); + } + + if (restPackages != null && !restPackages.isEmpty()) { + List<String> restPackageList = + new ArrayList<String>(Arrays.asList(restPackages.split("\\s*,\\s*"))); + for (String restPackage : restPackageList) + service.addServletPackage(restUriPath, restPackage); + } + + serviceList.add(service); + } + + return serviceList; + } + + @Override + public synchronized HttpServletServer get(int port) throws IllegalArgumentException { + + if (servers.containsKey(port)) { + return servers.get(port); + } + + throw new IllegalArgumentException("Http Server for " + port + " not found"); + } + + @Override + public synchronized List<HttpServletServer> inventory() { + return new ArrayList<HttpServletServer>(this.servers.values()); + } + + @Override + public synchronized void destroy(int port) throws IllegalArgumentException, IllegalStateException { + + if (!servers.containsKey(port)) { + return; + } + + HttpServletServer server = servers.remove(port); + server.shutdown(); + } + + @Override + public synchronized void destroy() throws IllegalArgumentException, IllegalStateException { + List<HttpServletServer> servers = this.inventory(); + for (HttpServletServer server: servers) { + server.shutdown(); + } + + synchronized(this) { + this.servers.clear(); + } + } + +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/internal/JettyJerseyServer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/internal/JettyJerseyServer.java new file mode 100644 index 00000000..4914a4cb --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/internal/JettyJerseyServer.java @@ -0,0 +1,130 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.policy.drools.http.server.internal; + +import java.util.ArrayList; +import java.util.HashMap; + +import org.eclipse.jetty.servlet.ServletHolder; + +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.flexlogger.Logger; + +/** + * REST Jetty Server using Jersey + */ +public class JettyJerseyServer extends JettyServletServer { + + protected static final String JERSEY_PACKAGES_PARAM = "jersey.config.server.provider.packages"; + protected static final String JERSEY_CLASSNAMES_PARAM = "jersey.config.server.provider.classnames"; + + protected static Logger logger = FlexLogger.getLogger(JettyJerseyServer.class); + + protected ArrayList<String> packages = new ArrayList<String>(); + protected HashMap<String, ServletHolder> servlets = + new HashMap<String, ServletHolder>(); + + public JettyJerseyServer(String name, String host, int port, String contextPath) + throws IllegalArgumentException { + super(name, host, port, contextPath); + } + + protected synchronized ServletHolder getServlet(String servletPath) + throws IllegalArgumentException { + + if (servletPath == null || servletPath.isEmpty()) + servletPath = "/*"; + + ServletHolder jerseyServlet = servlets.get(servletPath); + if (jerseyServlet == null) { + jerseyServlet = context.addServlet + (org.glassfish.jersey.servlet.ServletContainer.class, servletPath); + jerseyServlet.setInitOrder(0); + String initPackages = + jerseyServlet.getInitParameter(JERSEY_PACKAGES_PARAM); + if (initPackages == null) { + jerseyServlet.setInitParameter( + JERSEY_PACKAGES_PARAM, + "com.jersey.jaxb,com.fasterxml.jackson.jaxrs.json"); + } + this.servlets.put(servletPath, jerseyServlet); + } + + return jerseyServlet; + } + + @Override + public synchronized void addServletPackage(String servletPath, String restPackage) + throws IllegalArgumentException, IllegalStateException { + + if (restPackage == null || restPackage.isEmpty()) + throw new IllegalArgumentException("No discoverable REST package provided"); + + ServletHolder jerseyServlet = this.getServlet(servletPath); + if (jerseyServlet == null) + throw new IllegalStateException("Unexpected, no Jersey Servlet class"); + + String initPackages = + jerseyServlet.getInitParameter(JERSEY_PACKAGES_PARAM); + if (initPackages == null) + throw new IllegalStateException("Unexpected, no Init Parameters loaded"); + + jerseyServlet.setInitParameter( + JERSEY_PACKAGES_PARAM, + initPackages + "," + restPackage); + + if (logger.isDebugEnabled()) + logger.debug(this + "Added REST Package: " + jerseyServlet.dump()); + } + + @Override + public synchronized void addServletClass(String servletPath, String restClass) + throws IllegalArgumentException, IllegalStateException { + + if (restClass == null || restClass.isEmpty()) + throw new IllegalArgumentException("No discoverable REST class provided"); + + ServletHolder jerseyServlet = this.getServlet(servletPath); + if (jerseyServlet == null) + throw new IllegalStateException("Unexpected, no Jersey Servlet class"); + + String initClasses = + jerseyServlet.getInitParameter(JERSEY_CLASSNAMES_PARAM); + if (initClasses == null) + initClasses = restClass; + else + initClasses = initClasses + "," + restClass; + + jerseyServlet.setInitParameter( + JERSEY_CLASSNAMES_PARAM, + initClasses); + + if (logger.isDebugEnabled()) + logger.debug(this + "Added REST Class: " + jerseyServlet.dump()); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("JerseyJettyServer [packages=").append(packages).append(", servlets=").append(servlets) + .append(", toString()=").append(super.toString()).append("]"); + return builder.toString(); + } +} diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/internal/JettyServletServer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/internal/JettyServletServer.java new file mode 100644 index 00000000..74360e80 --- /dev/null +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/internal/JettyServletServer.java @@ -0,0 +1,353 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.policy.drools.http.server.internal; + +import org.eclipse.jetty.security.ConstraintMapping; +import org.eclipse.jetty.security.ConstraintSecurityHandler; +import org.eclipse.jetty.security.HashLoginService; +import org.eclipse.jetty.security.authentication.BasicAuthenticator; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.util.security.Constraint; +import org.eclipse.jetty.util.security.Credential; + +import org.openecomp.policy.common.logging.eelf.MessageCodes; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.flexlogger.Logger; +import org.openecomp.policy.drools.http.server.HttpServletServer; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +/** + * Http Server implementation using Embedded Jetty + */ +public abstract class JettyServletServer implements HttpServletServer, Runnable { + + private static Logger logger = FlexLogger.getLogger(JettyServletServer.class); + + protected final String name; + + protected final String host; + protected final int port; + + protected String user; + protected String password; + + protected final String contextPath; + + protected final Server jettyServer; + protected final ServletContextHandler context; + protected final ServerConnector connector; + + protected volatile Thread jettyThread; + + protected Object startCondition = new Object(); + + public JettyServletServer(String name, String host, int port, String contextPath) + throws IllegalArgumentException { + + if (name == null || name.isEmpty()) + name = "http-" + port; + + if (port <= 0 && port >= 65535) + throw new IllegalArgumentException("Invalid Port provided: " + port); + + if (host == null || host.isEmpty()) + host = "localhost"; + + if (contextPath == null || contextPath.isEmpty()) + contextPath = "/"; + + this.name = name; + + this.host = host; + this.port = port; + + this.contextPath = contextPath; + + this.context = new ServletContextHandler(ServletContextHandler.SESSIONS); + this.context.setContextPath(contextPath); + + this.jettyServer = new Server(); + + this.connector = new ServerConnector(this.jettyServer); + this.connector.setName(name); + this.connector.setReuseAddress(true); + this.connector.setPort(port); + this.connector.setHost(host); + + this.jettyServer.addConnector(this.connector); + this.jettyServer.setHandler(context); + } + + /** + * {@inheritDoc} + */ + @Override + public void setBasicAuthentication(String user, String password, String servletPath) { + if (user == null || user.isEmpty() || password == null || password.isEmpty()) + throw new IllegalArgumentException("Missing user and/or password"); + + if (servletPath == null || servletPath.isEmpty()) + servletPath = "/*"; + + HashLoginService hashLoginService = new HashLoginService(); + hashLoginService.putUser(user, + Credential.getCredential(password), + new String[] {"user"}); + hashLoginService.setName(this.connector.getName() + "-login-service"); + + Constraint constraint = new Constraint(); + constraint.setName(Constraint.__BASIC_AUTH); + constraint.setRoles(new String[]{"user"}); + constraint.setAuthenticate(true); + + ConstraintMapping constraintMapping = new ConstraintMapping(); + constraintMapping.setConstraint(constraint); + constraintMapping.setPathSpec(servletPath); + + ConstraintSecurityHandler securityHandler = new ConstraintSecurityHandler(); + securityHandler.setAuthenticator(new BasicAuthenticator()); + securityHandler.setRealmName(this.connector.getName() + "-realm"); + securityHandler.addConstraintMapping(constraintMapping); + securityHandler.setLoginService(hashLoginService); + + this.context.setSecurityHandler(securityHandler); + + this.user = user; + this.password = password; + } + + /** + * Jetty Server Execution + */ + @Override + public void run() { + try { + if (logger.isInfoEnabled()) + logger.info(this + " STARTING " + this.jettyServer.dump()); + + this.jettyServer.start(); + + synchronized(this.startCondition) { + this.startCondition.notifyAll(); + } + + this.jettyServer.join(); + } catch (Exception e) { + logger.warn(MessageCodes.EXCEPTION_ERROR, e, + "Error found while running management server", this.toString()); + } + } + + @Override + public boolean waitedStart(long maxWaitTime) throws IllegalArgumentException { + + if (maxWaitTime < 0) + throw new IllegalArgumentException("max-wait-time cannot be negative"); + + long pendingWaitTime = maxWaitTime; + + if (!this.start()) + return false; + + synchronized (this.startCondition) { + + while (!this.jettyServer.isRunning()) { + try { + long startTs = System.currentTimeMillis(); + + this.startCondition.wait(pendingWaitTime); + + if (maxWaitTime == 0) + /* spurious notification */ + continue; + + long endTs = System.currentTimeMillis(); + pendingWaitTime = pendingWaitTime - (endTs - startTs); + + if (logger.isInfoEnabled()) + logger.info(this + "Pending time is " + pendingWaitTime + + " ms."); + + if (pendingWaitTime <= 0) + return false; + + } catch (InterruptedException e) { + logger.warn("waited-start has been interrupted"); + return false; + } + } + + return (this.jettyServer.isRunning()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean start() throws IllegalStateException { + if (logger.isDebugEnabled()) + logger.debug(this + "START"); + + synchronized(this) { + if (jettyThread == null || + !this.jettyThread.isAlive()) { + + this.jettyThread = new Thread(this); + this.jettyThread.setName(this.name + "-" + this.port); + this.jettyThread.start(); + } + } + + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean stop() throws IllegalStateException { + logger.info(this + "STOP"); + + synchronized(this) { + if (jettyThread == null) { + return true; + } + + if (!jettyThread.isAlive()) { + this.jettyThread = null; + } + + try { + this.connector.stop(); + } catch (Exception e) { + logger.error(MessageCodes.EXCEPTION_ERROR, e, + "Error while stopping management server", this.toString()); + e.printStackTrace(); + } + + try { + this.jettyServer.stop(); + } catch (Exception e) { + logger.error(MessageCodes.EXCEPTION_ERROR, e, + "Error while stopping management server", this.toString()); + return false; + } + + Thread.yield(); + } + + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() throws IllegalStateException { + logger.info(this + "SHUTDOWN"); + + this.stop(); + + if (this.jettyThread == null) + return; + + Thread jettyThreadCopy = this.jettyThread; + + if (jettyThreadCopy.isAlive()) { + try { + jettyThreadCopy.join(1000L); + } catch (InterruptedException e) { + logger.warn(MessageCodes.EXCEPTION_ERROR, e, + "Error while shutting down management server", this.toString()); + } + if (!jettyThreadCopy.isInterrupted()) { + try { + jettyThreadCopy.interrupt(); + } catch(Exception e) { + // do nothing + logger.warn("exception while shutting down (OK)"); + } + } + } + + this.jettyServer.destroy(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isAlive() { + if (this.jettyThread != null) + return this.jettyThread.isAlive(); + + return false; + } + + @Override + public int getPort() { + return this.port; + } + + /** + * @return the name + */ + public String getName() { + return name; + } + + /** + * @return the host + */ + public String getHost() { + return host; + } + + /** + * @return the user + */ + public String getUser() { + return user; + } + + /** + * @return the password + */ + @JsonIgnore + public String getPassword() { + return password; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("JettyServer [name=").append(name).append(", host=").append(host).append(", port=").append(port) + .append(", user=").append(user).append(", password=").append((password != null)).append(", contextPath=") + .append(contextPath).append(", jettyServer=").append(jettyServer).append(", context=").append(this.context) + .append(", connector=").append(connector).append(", jettyThread=").append(jettyThread) + .append("]"); + return builder.toString(); + } + +} |