diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org')
55 files changed, 7619 insertions, 0 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/FilterableTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/FilterableTopicSource.java new file mode 100644 index 00000000..27f4ceb7 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/FilterableTopicSource.java @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm; + +/** + * TopicSource that supports server-side filtering. + */ +public interface FilterableTopicSource extends TopicSource { + + /** + * Sets the server-side filter. + * + * @param filter new filter value, or {@code null} + * @throws UnsupportedOperationException if the consumer does not support + * server-side filtering + * @throws IllegalArgumentException if the consumer cannot be built with the + * new filter + */ + public void setFilter(String filter); + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java new file mode 100644 index 00000000..a6d70a1e --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java @@ -0,0 +1,89 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm; + +import java.util.List; + +import org.onap.policy.common.capabilities.Lockable; +import org.onap.policy.common.capabilities.Startable; + + +/** + * Essential Topic Data + */ +public interface Topic extends TopicRegisterable, Startable, Lockable { + + /** + * network logger + */ + public static final String NETWORK_LOGGER = "network"; + + /** + * Underlying Communication infrastructure Types + */ + public enum CommInfrastructure { + /** + * UEB Communication Infrastructure + */ + UEB, + /** + * DMAAP Communication Infrastructure + */ + DMAAP, + /** + * NOOP for internal use only + */ + NOOP, + /** + * REST Communication Infrastructure + */ + REST + } + + /** + * gets the topic name + * + * @return topic name + */ + public String getTopic(); + + /** + * gets the communication infrastructure type + * + * @return + */ + public CommInfrastructure getTopicCommInfrastructure(); + + /** + * return list of servers + * + * @return bus servers + */ + public List<String> getServers(); + + /** + * get the more recent events in this topic entity + * + * @return list of most recent events + */ + public String[] getRecentEvents(); + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java new file mode 100644 index 00000000..03e6776a --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -0,0 +1,229 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm; + +import java.util.List; +import java.util.Properties; + +import org.onap.policy.common.capabilities.Lockable; +import org.onap.policy.common.capabilities.Startable; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; + +/** + * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into + * the System. + */ +public interface TopicEndpoint extends Startable, Lockable { + + /** + * Add Topic Sources to the communication infrastructure initialized per properties + * + * @param properties properties for Topic Source construction + * @return a generic Topic Source + * @throws IllegalArgumentException when invalid arguments are provided + */ + public List<TopicSource> addTopicSources(Properties properties); + + /** + * Add Topic Sinks to the communication infrastructure initialized per properties + * + * @param properties properties for Topic Sink construction + * @return a generic Topic Sink + * @throws IllegalArgumentException when invalid arguments are provided + */ + public List<TopicSink> addTopicSinks(Properties properties); + + /** + * gets all Topic Sources + * + * @return the Topic Source List + */ + List<TopicSource> getTopicSources(); + + /** + * get the Topic Sources for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source List + * @throws IllegalStateException if the entity is in an invalid state + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<TopicSource> getTopicSources(List<String> topicNames); + + /** + * gets the Topic Source for the given topic name and underlying communication infrastructure + * type + * + * @param commType communication infrastructure type + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + * @throws UnsupportedOperationException if the operation is not supported. + */ + public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName); + + /** + * get the UEB Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the UEB Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSource getUebTopicSource(String topicName); + + /** + * get the DMAAP Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the DMAAP Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSource getDmaapTopicSource(String topicName); + + /** + * get the Topic Sinks for the given topic name + * + * @param topicNames the topic names + * @return the Topic Sink List + * @throws IllegalStateException + * @throws IllegalArgumentException + */ + public List<TopicSink> getTopicSinks(List<String> topicNames); + + /** + * get the Topic Sinks for the given topic name and underlying communication infrastructure type + * + * @param topicName the topic name + * @param commType communication infrastructure type + * + * @return the Topic Sink List + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicWriters for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName); + + /** + * get the Topic Sinks for the given topic name and all the underlying communication + * infrastructure type + * + * @param topicName the topic name + * @param commType communication infrastructure type + * + * @return the Topic Sink List + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicWriters for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<TopicSink> getTopicSinks(String topicName); + + /** + * get the UEB Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSink getUebTopicSink(String topicName); + + /** + * get the no-op Topic Sink for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public NoopTopicSink getNoopTopicSink(String topicName); + + /** + * get the DMAAP Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSink getDmaapTopicSink(String topicName); + + /** + * gets only the UEB Topic Sources + * + * @return the UEB Topic Source List + */ + public List<UebTopicSource> getUebTopicSources(); + + /** + * gets only the DMAAP Topic Sources + * + * @return the DMAAP Topic Source List + */ + public List<DmaapTopicSource> getDmaapTopicSources(); + + /** + * gets all Topic Sinks + * + * @return the Topic Sink List + */ + public List<TopicSink> getTopicSinks(); + + /** + * gets only the UEB Topic Sinks + * + * @return the UEB Topic Sink List + */ + public List<UebTopicSink> getUebTopicSinks(); + + /** + * gets only the DMAAP Topic Sinks + * + * @return the DMAAP Topic Sink List + */ + public List<DmaapTopicSink> getDmaapTopicSinks(); + + /** + * gets only the NOOP Topic Sinks + * + * @return the NOOP Topic Sinks List + */ + public List<NoopTopicSink> getNoopTopicSinks(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicListener.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicListener.java new file mode 100644 index 00000000..d1695d39 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicListener.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm; + +/** + * Listener for event messages entering the Policy Engine + */ +@FunctionalInterface +public interface TopicListener { + + /** + * Notification of a new Event over a given Topic + * + * @param commType communication infrastructure type + * @param topic topic name + * @param event event message as a string + */ + public void onTopicEvent(Topic.CommInfrastructure commType, String topic, String event); + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicRegisterable.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicRegisterable.java new file mode 100644 index 00000000..c8bd0bd3 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicRegisterable.java @@ -0,0 +1,42 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm; + +/** + * Marks a Topic entity as registerable + */ +public interface TopicRegisterable { + + /** + * Register for notification of events with this Topic Entity + * + * @param topicListener the listener of events + */ + public void register(TopicListener topicListener); + + /** + * Unregisters for notification of events with this Topic Entity + * + * @param topicListener the listener of events + */ + public void unregister(TopicListener topicListener); + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java new file mode 100644 index 00000000..5127da9d --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java @@ -0,0 +1,40 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm; + +/** + * Marks a given Topic Endpoint as able to send messages over a topic + */ +public interface TopicSink extends Topic { + + /** + * Sends a string message over this Topic Endpoint + * + * @param message message to send + * + * @return true if the send operation succeeded, false otherwise + * @throws IllegalArgumentException an invalid message has been provided + * @throws IllegalStateException the entity is in an state that prevents + * it from sending messages, for example, locked or stopped. + */ + public boolean send(String message); + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSource.java new file mode 100644 index 00000000..fb39764a --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSource.java @@ -0,0 +1,37 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm; + +/** + * Marker for a Topic Entity, indicating that the entity is able to read + * over a topic + */ +public interface TopicSource extends Topic { + + /** + * pushes an event into the source programatically + * + * @param event the event in json format + * @return true if it can be processed correctly, false otherwise + */ + public boolean offer(String event); + +}
\ No newline at end of file diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/ApiKeyEnabled.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/ApiKeyEnabled.java new file mode 100644 index 00000000..3a57d2be --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/ApiKeyEnabled.java @@ -0,0 +1,36 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +/** + * API + */ +public interface ApiKeyEnabled { + /** + * @return api key + */ + public String getApiKey(); + + /** + * @return api secret + */ + public String getApiSecret(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java new file mode 100644 index 00000000..d06983d8 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import org.onap.policy.common.endpoints.event.comm.TopicSink; + +/** + * Topic Sink over Bus Infrastructure (DMAAP/UEB) + */ +public interface BusTopicSink extends ApiKeyEnabled, TopicSink { + /** + * Log Failures after X number of retries + */ + public static final int DEFAULT_LOG_SEND_FAILURES_AFTER = 1; + + /** + * Sets the UEB partition key for published messages + * + * @param partitionKey the partition key + */ + public void setPartitionKey(String partitionKey); + + /** + * return the partition key in used by the system to publish messages + * + * @return the partition key + */ + public String getPartitionKey(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java new file mode 100644 index 00000000..2891513d --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java @@ -0,0 +1,78 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import org.onap.policy.common.endpoints.event.comm.TopicSource; + +/** + * Generic Topic Source for UEB/DMAAP Communication Infrastructure + * + */ +public interface BusTopicSource extends ApiKeyEnabled, TopicSource { + + /** + * Default Timeout fetching in milliseconds + */ + public static int DEFAULT_TIMEOUT_MS_FETCH = 15000; + + /** + * Default maximum number of messages fetch at the time + */ + public static int DEFAULT_LIMIT_FETCH = 100; + + /** + * Definition of No Timeout fetching + */ + public static int NO_TIMEOUT_MS_FETCH = -1; + + /** + * Definition of No limit fetching + */ + public static int NO_LIMIT_FETCH = -1; + + /** + * gets the consumer group + * + * @return consumer group + */ + public String getConsumerGroup(); + + /** + * gets the consumer instance + * + * @return consumer instance + */ + public String getConsumerInstance(); + + /** + * gets the fetch timeout + * + * @return fetch timeout + */ + public int getFetchTimeout(); + + /** + * gets the fetch limit + * + * @return fetch limit + */ + public int getFetchLimit(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java new file mode 100644 index 00000000..845945cd --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java @@ -0,0 +1,24 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +public interface DmaapTopicSink extends BusTopicSink { +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java new file mode 100644 index 00000000..2e3ecf29 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java @@ -0,0 +1,135 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * DMAAP Topic Sink Factory + */ +public interface DmaapTopicSinkFactory { + public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; + public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; + public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; + public final String DME2_VERSION_PROPERTY = "Version"; + public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; + public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; + public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; + public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; + + /** + * Instantiates a new DMAAP Topic Sink + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param userName AAF user name + * @param password AAF password + * @param partitionKey Consumer Group + * @param environment DME2 environment + * @param aftEnvironment DME2 AFT environment + * @param partner DME2 Partner + * @param latitude DME2 latitude + * @param longitude DME2 longitude + * @param additionalProps additional properties to pass to DME2 + * @param managed is this sink endpoint managed? + * + * @return an DMAAP Topic Sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, + String password, String partitionKey, String environment, String aftEnvironment, String partner, + String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps, + boolean allowSelfSignedCerts); + + /** + * Instantiates a new DMAAP Topic Sink + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param userName AAF user name + * @param password AAF password + * @param partitionKey Consumer Group + * @param managed is this sink endpoint managed? + * + * @return an DMAAP Topic Sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, + String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts); + + /** + * Creates an DMAAP Topic Sink based on properties files + * + * @param properties Properties containing initialization values + * + * @return an DMAAP Topic Sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<DmaapTopicSink> build(Properties properties); + + /** + * Instantiates a new DMAAP Topic Sink + * + * @param servers list of servers + * @param topic topic name + * + * @return an DMAAP Topic Sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSink build(List<String> servers, String topic); + + /** + * Destroys an DMAAP Topic Sink based on a topic + * + * @param topic topic name + * @throws IllegalArgumentException if invalid parameters are present + */ + public void destroy(String topic); + + /** + * gets an DMAAP Topic Sink based on topic name + * + * @param topic the topic name + * + * @return an DMAAP Topic Sink with topic name + * @throws IllegalArgumentException if an invalid topic is provided + * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state + */ + public DmaapTopicSink get(String topic); + + /** + * Provides a snapshot of the DMAAP Topic Sinks + * + * @return a list of the DMAAP Topic Sinks + */ + public List<DmaapTopicSink> inventory(); + + /** + * Destroys all DMAAP Topic Sinks + */ + public void destroy(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java new file mode 100644 index 00000000..b50c752f --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java @@ -0,0 +1,24 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +public interface DmaapTopicSource extends BusTopicSource { +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java new file mode 100644 index 00000000..adfb4b42 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java @@ -0,0 +1,158 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * DMAAP Topic Source Factory + */ +public interface DmaapTopicSourceFactory { + public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; + public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; + public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; + public final String DME2_VERSION_PROPERTY = "Version"; + public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; + public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; + public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; + public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; + + /** + * Creates an DMAAP Topic Source based on properties files + * + * @param properties Properties containing initialization values + * + * @return an DMAAP Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<DmaapTopicSource> build(Properties properties); + + /** + * Instantiates a new DMAAP Topic Source + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param userName user name + * @param password password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Read Fetch Timeout + * @param fetchLimit Fetch Limit + * @param managed is this endpoind managed? + * @param useHttps does the connection use HTTPS? + * @param allowSelfSignedCerts does connection allow self-signed certificates? + * + * @return an DMAAP Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, + String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, + boolean managed, boolean useHttps, boolean allowSelfSignedCerts); + + /** + * Instantiates a new DMAAP Topic Source + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param userName user name + * @param password password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Read Fetch Timeout + * @param fetchLimit Fetch Limit + * @param environment DME2 environment + * @param aftEnvironment DME2 AFT environment + * @param partner DME2 Partner + * @param latitude DME2 latitude + * @param longitude DME2 longitude + * @param additionalProps additional properties to pass to DME2 + * @param managed is this endpoind managed? + * @param useHttps does the connection use HTTPS? + * @param allowSelfSignedCerts does connection allow self-signed certificates? + * + * @return an DMAAP Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, + String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, + String environment, String aftEnvironment, String partner, String latitude, String longitude, + Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts); + + /** + * Instantiates a new DMAAP Topic Source + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * + * @return an DMAAP Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret); + + /** + * Instantiates a new DMAAP Topic Source + * + * @param servers list of servers + * @param topic topic name + * + * @return an DMAAP Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSource build(List<String> servers, String topic); + + /** + * Destroys an DMAAP Topic Source based on a topic + * + * @param topic topic name + * @throws IllegalArgumentException if invalid parameters are present + */ + public void destroy(String topic); + + /** + * Destroys all DMAAP Topic Sources + */ + public void destroy(); + + /** + * gets an DMAAP Topic Source based on topic name + * + * @param topic the topic name + * @return an DMAAP Topic Source with topic name + * @throws IllegalArgumentException if an invalid topic is provided + * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state + */ + public DmaapTopicSource get(String topic); + + /** + * Provides a snapshot of the DMAAP Topic Sources + * + * @return a list of the DMAAP Topic Sources + */ + public List<DmaapTopicSource> inventory(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java new file mode 100644 index 00000000..c6cbf343 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java @@ -0,0 +1,127 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; + +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.internal.TopicBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * NOOP topic sink + */ +public class NoopTopicSink extends TopicBase implements TopicSink { + + /** + * logger + */ + private static Logger logger = LoggerFactory.getLogger(NoopTopicSink.class); + + /** + * net logger + */ + private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER); + + /** + * constructor + * + * @param servers servers + * @param topic topic + * @throws IllegalArgumentException if an invalid argument has been passed in + */ + public NoopTopicSink(List<String> servers, String topic) { + super(servers, topic); + } + + @Override + public boolean send(String message) { + + if (message == null || message.isEmpty()) { + throw new IllegalArgumentException("Message to send is empty"); + } + + if (!this.alive) { + throw new IllegalStateException(this + " is stopped"); + } + + try { + synchronized (this) { + this.recentEvents.add(message); + } + + netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), + message); + + broadcast(message); + } catch (Exception e) { + logger.warn("{}: cannot send because of {}", this, e.getMessage(), e); + return false; + } + + return true; + } + + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return CommInfrastructure.NOOP; + } + + @Override + public boolean start() { + logger.info("{}: starting", this); + + synchronized (this) { + + if (this.alive) { + return true; + } + + if (locked) { + throw new IllegalStateException(this + " is locked."); + } + + this.alive = true; + } + + return true; + } + + @Override + public boolean stop() { + synchronized (this) { + this.alive = false; + } + return true; + } + + @Override + public void shutdown() { + this.stop(); + } + + @Override + public String toString() { + return "NoopTopicSink [toString()=" + super.toString() + "]"; + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java new file mode 100644 index 00000000..c555d94f --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java @@ -0,0 +1,82 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import java.util.Properties; + +/** + * Noop Topic Sink Factory + */ +public interface NoopTopicSinkFactory { + + /** + * Creates noop topic sinks based on properties files + * + * @param properties Properties containing initialization values + * + * @return a noop topic sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<NoopTopicSink> build(Properties properties); + + /** + * builds a noop sink + * + * @param servers list of servers + * @param topic topic name + * @param managed is this sink endpoint managed? + * @return a noop topic sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public NoopTopicSink build(List<String> servers, String topic, boolean managed); + + /** + * Destroys a sink based on the topic + * + * @param topic topic name + * @throws IllegalArgumentException if invalid parameters are present + */ + public void destroy(String topic); + + /** + * gets a sink based on topic name + * + * @param topic the topic name + * + * @return a sink with topic name + * @throws IllegalArgumentException if an invalid topic is provided + * @throws IllegalStateException if the sink is in an incorrect state + */ + public NoopTopicSink get(String topic); + + /** + * Provides a snapshot of the UEB Topic Writers + * + * @return a list of the UEB Topic Writers + */ + public List<NoopTopicSink> inventory(); + + /** + * Destroys all sinks + */ + public void destroy(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java new file mode 100644 index 00000000..0e9398de --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java @@ -0,0 +1,28 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +/** + * Topic Writer over UEB Infrastructure + */ +public interface UebTopicSink extends BusTopicSink { + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java new file mode 100644 index 00000000..37920635 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import java.util.Properties; + +/** + * UEB Topic Sink Factory + */ +public interface UebTopicSinkFactory { + + /** + * Instantiates a new UEB Topic Writer + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param partitionKey Consumer Group + * @param managed is this sink endpoint managed? + * + * @return an UEB Topic Sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String partitionKey, + boolean managed, boolean useHttps, boolean allowSelfSignedCerts); + + /** + * Creates an UEB Topic Writer based on properties files + * + * @param properties Properties containing initialization values + * + * @return an UEB Topic Writer + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<UebTopicSink> build(Properties properties); + + /** + * Instantiates a new UEB Topic Writer + * + * @param servers list of servers + * @param topic topic name + * + * @return an UEB Topic Writer + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSink build(List<String> servers, String topic); + + /** + * Destroys an UEB Topic Writer based on a topic + * + * @param topic topic name + * @throws IllegalArgumentException if invalid parameters are present + */ + public void destroy(String topic); + + /** + * gets an UEB Topic Writer based on topic name + * + * @param topic the topic name + * + * @return an UEB Topic Writer with topic name + * @throws IllegalArgumentException if an invalid topic is provided + * @throws IllegalStateException if the UEB Topic Reader is an incorrect state + */ + public UebTopicSink get(String topic); + + /** + * Provides a snapshot of the UEB Topic Writers + * + * @return a list of the UEB Topic Writers + */ + public List<UebTopicSink> inventory(); + + /** + * Destroys all UEB Topic Writers + */ + public void destroy(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java new file mode 100644 index 00000000..db14800c --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java @@ -0,0 +1,29 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +/** + * Topic Source for UEB Communication Infrastructure + * + */ +public interface UebTopicSource extends BusTopicSource { + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java new file mode 100644 index 00000000..d3d632e0 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java @@ -0,0 +1,114 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.List; +import java.util.Properties; + +/** + * UEB Topic Source Factory + */ +public interface UebTopicSourceFactory { + + /** + * Creates an UEB Topic Source based on properties files + * + * @param properties Properties containing initialization values + * + * @return an UEB Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<UebTopicSource> build(Properties properties); + + /** + * Instantiates a new UEB Topic Source + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Read Fetch Timeout + * @param fetchLimit Fetch Limit + * @param managed is this source endpoint managed? + * + * @return an UEB Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, + String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed, + boolean useHttps, boolean allowSelfSignedCerts); + + /** + * Instantiates a new UEB Topic Source + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * + * @return an UEB Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret); + + /** + * Instantiates a new UEB Topic Source + * + * @param servers list of servers + * @param topic topic name + * + * @return an UEB Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSource build(List<String> servers, String topic); + + /** + * Destroys an UEB Topic Source based on a topic + * + * @param topic topic name + * @throws IllegalArgumentException if invalid parameters are present + */ + public void destroy(String topic); + + /** + * Destroys all UEB Topic Sources + */ + public void destroy(); + + /** + * gets an UEB Topic Source based on topic name + * + * @param topic the topic name + * @return an UEB Topic Source with topic name + * @throws IllegalArgumentException if an invalid topic is provided + * @throws IllegalStateException if the UEB Topic Source is an incorrect state + */ + public UebTopicSource get(String topic); + + /** + * Provides a snapshot of the UEB Topic Sources + * + * @return a list of the UEB Topic Sources + */ + public List<UebTopicSource> inventory(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedDmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedDmaapTopicSinkFactory.java new file mode 100644 index 00000000..e252988e --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedDmaapTopicSinkFactory.java @@ -0,0 +1,325 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.impl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory; +import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory of DMAAP Reader Topics indexed by topic name + */ +public class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { + private static final String MISSING_TOPIC = "A topic must be provided"; + + private static final IndexedDmaapTopicSinkFactory instance = new IndexedDmaapTopicSinkFactory(); + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class); + + /** + * DMAAP Topic Name Index + */ + protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>(); + + /** + * Get the singleton instance. + * + * @return the instance + */ + public static IndexedDmaapTopicSinkFactory getInstance() { + return instance; + } + + private IndexedDmaapTopicSinkFactory() {} + + @Override + public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, + String password, String partitionKey, String environment, String aftEnvironment, String partner, + String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps, + boolean allowSelfSignedCerts) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicWriters.containsKey(topic)) { + return dmaapTopicWriters.get(topic); + } + + DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName, + password, partitionKey, environment, aftEnvironment, partner, latitude, longitude, additionalProps, + useHttps, allowSelfSignedCerts); + + if (managed) { + dmaapTopicWriters.put(topic, dmaapTopicSink); + } + return dmaapTopicSink; + } + } + + @Override + public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, + String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicWriters.containsKey(topic)) { + return dmaapTopicWriters.get(topic); + } + + DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName, + password, partitionKey, useHttps, allowSelfSignedCerts); + + if (managed) { + dmaapTopicWriters.put(topic, dmaapTopicSink); + } + return dmaapTopicSink; + } + } + + @Override + public DmaapTopicSink build(List<String> servers, String topic) { + return this.build(servers, topic, null, null, null, null, null, true, false, false); + } + + @Override + public List<DmaapTopicSink> build(Properties properties) { + + String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS); + if (writeTopics == null || writeTopics.isEmpty()) { + logger.info("{}: no topic for DMaaP Sink", this); + return new ArrayList<>(); + } + + List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); + List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>(); + synchronized (this) { + for (String topic : writeTopicList) { + if (this.dmaapTopicWriters.containsKey(topic)) { + newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic)); + continue; + } + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + List<String> serverList; + if (servers != null && !servers.isEmpty()) { + serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + } else { + serverList = new ArrayList<>(); + } + + String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + + String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); + String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); + + String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); + + String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + + /* DME2 Properties */ + + String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + + String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + + String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); + + String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); + + String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + + String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + + String dme2EpReadTimeoutMs = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); + + String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); + + String dme2RoundtripTimeoutMs = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); + + String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); + + String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); + + String dme2SessionStickinessRequired = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); + + Map<String, String> dme2AdditionalProps = new HashMap<>(); + + if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { + dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); + } + if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { + dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); + } + if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { + dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); + } + if (dme2Version != null && !dme2Version.isEmpty()) { + dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); + } + if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { + dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + } + if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { + dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); + } + if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { + dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); + } + + if (servers == null || servers.isEmpty()) { + logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); + continue; + } + + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + // default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()) { + useHttps = Boolean.parseBoolean(useHttpsString); + } + + + String allowSelfSignedCertsString = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + // default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + + DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword, + partitionKey, dme2Environment, dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, + dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts); + + newDmaapTopicSinks.add(dmaapTopicSink); + } + return newDmaapTopicSinks; + } + } + + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + DmaapTopicSink dmaapTopicWriter; + synchronized (this) { + if (!dmaapTopicWriters.containsKey(topic)) { + return; + } + + dmaapTopicWriter = dmaapTopicWriters.remove(topic); + } + + dmaapTopicWriter.shutdown(); + } + + @Override + public void destroy() { + List<DmaapTopicSink> writers = this.inventory(); + for (DmaapTopicSink writer : writers) { + writer.shutdown(); + } + + synchronized (this) { + this.dmaapTopicWriters.clear(); + } + } + + @Override + public DmaapTopicSink get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicWriters.containsKey(topic)) { + return dmaapTopicWriters.get(topic); + } else { + throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); + } + } + } + + @Override + public synchronized List<DmaapTopicSink> inventory() { + return new ArrayList<>(this.dmaapTopicWriters.values()); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("IndexedDmaapTopicSinkFactory []"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedDmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedDmaapTopicSourceFactory.java new file mode 100644 index 00000000..0e469a1d --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedDmaapTopicSourceFactory.java @@ -0,0 +1,401 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.impl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSourceFactory; +import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory of DMAAP Source Topics indexed by topic name + */ + +public class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { + private static final String MISSING_TOPIC = "A topic must be provided"; + + private static final IndexedDmaapTopicSourceFactory instance = new IndexedDmaapTopicSourceFactory(); + + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class); + + /** + * DMaaP Topic Name Index + */ + protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>(); + + /** + * Get the singleton instance. + * + * @return the instance + */ + public static IndexedDmaapTopicSourceFactory getInstance() { + return instance; + } + + private IndexedDmaapTopicSourceFactory() {} + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, + String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, + String environment, String aftEnvironment, String partner, String latitude, String longitude, + Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicSources.containsKey(topic)) { + return dmaapTopicSources.get(topic); + } + + DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, + userName, password, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, environment, + aftEnvironment, partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts); + + if (managed) { + dmaapTopicSources.put(topic, dmaapTopicSource); + } + + return dmaapTopicSource; + } + } + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, + String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, + boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { + + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("DMaaP Server(s) must be provided"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicSources.containsKey(topic)) { + return dmaapTopicSources.get(topic); + } + + DmaapTopicSource dmaapTopicSource = + new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, userName, password, + consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); + + if (managed) { + dmaapTopicSources.put(topic, dmaapTopicSource); + } + + return dmaapTopicSource; + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<DmaapTopicSource> build(Properties properties) { + + String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS); + if (readTopics == null || readTopics.isEmpty()) { + logger.info("{}: no topic for DMaaP Source", this); + return new ArrayList<>(); + } + List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); + + List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>(); + synchronized (this) { + for (String topic : readTopicList) { + if (this.dmaapTopicSources.containsKey(topic)) { + dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + List<String> serverList; + if (servers != null && !servers.isEmpty()) { + serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + } else { + serverList = new ArrayList<>(); + } + + String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + + String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + + String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); + + String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); + + String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); + + String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); + + String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); + + /* DME2 Properties */ + + String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + + String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + + String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); + + String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); + + String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + + String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + + String dme2EpReadTimeoutMs = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); + + String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); + + String dme2RoundtripTimeoutMs = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); + + String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); + + String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); + + String dme2SessionStickinessRequired = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); + + Map<String, String> dme2AdditionalProps = new HashMap<>(); + + if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { + dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); + } + if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { + dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); + } + if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { + dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); + } + if (dme2Version != null && !dme2Version.isEmpty()) { + dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); + } + if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { + dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + } + if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { + dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); + } + if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { + dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); + } + + + if (servers == null || servers.isEmpty()) { + + logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); + continue; + } + + int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH; + if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { + try { + fetchTimeout = Integer.parseInt(fetchTimeoutString); + } catch (NumberFormatException nfe) { + logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, + topic); + } + } + + String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); + int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH; + if (fetchLimitString != null && !fetchLimitString.isEmpty()) { + try { + fetchLimit = Integer.parseInt(fetchLimitString); + } catch (NumberFormatException nfe) { + logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, + topic); + } + } + + String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + // default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()) { + useHttps = Boolean.parseBoolean(useHttpsString); + } + + String allowSelfSignedCertsString = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + // default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + + + DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId, + aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment, + dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed, + useHttps, allowSelfSignedCerts); + + dmaapTopicSourceLst.add(uebTopicSource); + } + } + return dmaapTopicSourceLst; + } + + /** + * {@inheritDoc} + * + * @throws IllegalArgumentException + */ + @Override + public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) { + return this.build(servers, topic, apiKey, apiSecret, null, null, null, null, + DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false); + } + + /** + * {@inheritDoc} + * + * @throws IllegalArgumentException + */ + @Override + public DmaapTopicSource build(List<String> servers, String topic) { + return this.build(servers, topic, null, null); + } + + /** + * {@inheritDoc} + */ + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + DmaapTopicSource uebTopicSource; + + synchronized (this) { + if (!dmaapTopicSources.containsKey(topic)) { + return; + } + + uebTopicSource = dmaapTopicSources.remove(topic); + } + + uebTopicSource.shutdown(); + } + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSource get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicSources.containsKey(topic)) { + return dmaapTopicSources.get(topic); + } else { + throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found"); + } + } + } + + @Override + public synchronized List<DmaapTopicSource> inventory() { + return new ArrayList<>(this.dmaapTopicSources.values()); + } + + @Override + public void destroy() { + List<DmaapTopicSource> readers = this.inventory(); + for (DmaapTopicSource reader : readers) { + reader.shutdown(); + } + + synchronized (this) { + this.dmaapTopicSources.clear(); + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("IndexedDmaapTopicSourceFactory []"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedNoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedNoopTopicSinkFactory.java new file mode 100644 index 00000000..78bfd46c --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedNoopTopicSinkFactory.java @@ -0,0 +1,185 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.impl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSinkFactory; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory of noop sinks + */ +public class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory { + private static final String MISSING_TOPIC = "A topic must be provided"; + + private static final IndexedNoopTopicSinkFactory instance = new IndexedNoopTopicSinkFactory(); + + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); + + /** + * noop topic sinks map + */ + protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>(); + + /** + * Get the singleton instance. + * + * @return the instance + */ + public static IndexedNoopTopicSinkFactory getInstance() { + return instance; + } + + private IndexedNoopTopicSinkFactory() {} + + @Override + public List<NoopTopicSink> build(Properties properties) { + + final String sinkTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS); + if (sinkTopics == null || sinkTopics.isEmpty()) { + logger.info("{}: no topic for noop sink", this); + return new ArrayList<>(); + } + + final List<String> sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*"))); + final List<NoopTopicSink> newSinks = new ArrayList<>(); + synchronized (this) { + for (final String topic : sinkTopicList) { + if (this.noopTopicSinks.containsKey(topic)) { + newSinks.add(this.noopTopicSinks.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + if (servers == null || servers.isEmpty()) { + servers = "noop"; + } + + final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + + final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + final NoopTopicSink noopSink = this.build(serverList, topic, managed); + newSinks.add(noopSink); + } + return newSinks; + } + } + + @Override + public NoopTopicSink build(List<String> servers, String topic, boolean managed) { + + List<String> noopSinkServers = servers; + if (noopSinkServers == null) { + noopSinkServers = new ArrayList<>(); + } + + if (noopSinkServers.isEmpty()) { + noopSinkServers.add("noop"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (this.noopTopicSinks.containsKey(topic)) { + return this.noopTopicSinks.get(topic); + } + + final NoopTopicSink sink = new NoopTopicSink(noopSinkServers, topic); + + if (managed) { + this.noopTopicSinks.put(topic, sink); + } + + return sink; + } + } + + @Override + public void destroy(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + NoopTopicSink noopSink; + synchronized (this) { + if (!this.noopTopicSinks.containsKey(topic)) { + return; + } + + noopSink = this.noopTopicSinks.remove(topic); + } + + noopSink.shutdown(); + } + + @Override + public void destroy() { + final List<NoopTopicSink> sinks = this.inventory(); + for (final NoopTopicSink sink : sinks) { + sink.shutdown(); + } + + synchronized (this) { + this.noopTopicSinks.clear(); + } + } + + @Override + public NoopTopicSink get(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (this.noopTopicSinks.containsKey(topic)) { + return this.noopTopicSinks.get(topic); + } else { + throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); + } + } + } + + @Override + public List<NoopTopicSink> inventory() { + return new ArrayList<>(this.noopTopicSinks.values()); + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedUebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedUebTopicSinkFactory.java new file mode 100644 index 00000000..19bdc7b2 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedUebTopicSinkFactory.java @@ -0,0 +1,229 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.impl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSinkFactory; +import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory of UEB Reader Topics indexed by topic name + */ +public class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { + + private static final IndexedUebTopicSinkFactory instance = new IndexedUebTopicSinkFactory(); + + private static final String MISSING_TOPIC = "A topic must be provided"; + + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); + + /** + * UEB Topic Name Index + */ + protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>(); + + /** + * Get the singleton instance. + * + * @return the instance + */ + public static IndexedUebTopicSinkFactory getInstance() { + return instance; + } + + private IndexedUebTopicSinkFactory() {} + + @Override + public UebTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String partitionKey, + boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { + + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("UEB Server(s) must be provided"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (uebTopicSinks.containsKey(topic)) { + return uebTopicSinks.get(topic); + } + + UebTopicSink uebTopicWriter = new InlineUebTopicSink(servers, topic, apiKey, apiSecret, partitionKey, + useHttps, allowSelfSignedCerts); + + if (managed) { + uebTopicSinks.put(topic, uebTopicWriter); + } + + return uebTopicWriter; + } + } + + + @Override + public UebTopicSink build(List<String> servers, String topic) { + return this.build(servers, topic, null, null, null, true, false, false); + } + + + @Override + public List<UebTopicSink> build(Properties properties) { + + String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS); + if (writeTopics == null || writeTopics.isEmpty()) { + logger.info("{}: no topic for UEB Sink", this); + return new ArrayList<>(); + } + + List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); + List<UebTopicSink> newUebTopicSinks = new ArrayList<>(); + synchronized (this) { + for (String topic : writeTopicList) { + if (this.uebTopicSinks.containsKey(topic)) { + newUebTopicSinks.add(this.uebTopicSinks.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (servers == null || servers.isEmpty()) { + logger.error("{}: no UEB servers configured for sink {}", this, topic); + continue; + } + + List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + + String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); + + String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + // default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()) { + useHttps = Boolean.parseBoolean(useHttpsString); + } + + + String allowSelfSignedCertsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + // default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + + UebTopicSink uebTopicWriter = this.build(serverList, topic, apiKey, apiSecret, partitionKey, managed, + useHttps, allowSelfSignedCerts); + newUebTopicSinks.add(uebTopicWriter); + } + return newUebTopicSinks; + } + } + + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + UebTopicSink uebTopicWriter; + synchronized (this) { + if (!uebTopicSinks.containsKey(topic)) { + return; + } + + uebTopicWriter = uebTopicSinks.remove(topic); + } + + uebTopicWriter.shutdown(); + } + + @Override + public void destroy() { + List<UebTopicSink> writers = this.inventory(); + for (UebTopicSink writer : writers) { + writer.shutdown(); + } + + synchronized (this) { + this.uebTopicSinks.clear(); + } + } + + @Override + public UebTopicSink get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (uebTopicSinks.containsKey(topic)) { + return uebTopicSinks.get(topic); + } else { + throw new IllegalStateException("UebTopicSink for " + topic + " not found"); + } + } + } + + @Override + public synchronized List<UebTopicSink> inventory() { + return new ArrayList<>(this.uebTopicSinks.values()); + } + + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("IndexedUebTopicSinkFactory []"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedUebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedUebTopicSourceFactory.java new file mode 100644 index 00000000..5363a30d --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedUebTopicSourceFactory.java @@ -0,0 +1,280 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.impl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSourceFactory; +import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory of UEB Source Topics indexed by topic name + */ +public class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { + private static final String MISSING_TOPIC = "A topic must be provided"; + + private static final IndexedUebTopicSourceFactory instance = new IndexedUebTopicSourceFactory(); + + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class); + + /** + * UEB Topic Name Index + */ + protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>(); + + /** + * Get the singleton instance. + * + * @return the instance + */ + public static IndexedUebTopicSourceFactory getInstance() { + return instance; + } + + private IndexedUebTopicSourceFactory() {} + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, + String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed, + boolean useHttps, boolean allowSelfSignedCerts) { + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("UEB Server(s) must be provided"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (uebTopicSources.containsKey(topic)) { + return uebTopicSources.get(topic); + } + + UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(servers, topic, apiKey, apiSecret, + consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); + + if (managed) { + uebTopicSources.put(topic, uebTopicSource); + } + + return uebTopicSource; + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<UebTopicSource> build(Properties properties) { + + String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS); + if (readTopics == null || readTopics.isEmpty()) { + logger.info("{}: no topic for UEB Source", this); + return new ArrayList<>(); + } + List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); + + List<UebTopicSource> newUebTopicSources = new ArrayList<>(); + synchronized (this) { + for (String topic : readTopicList) { + if (this.uebTopicSources.containsKey(topic)) { + newUebTopicSources.add(this.uebTopicSources.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + if (servers == null || servers.isEmpty()) { + logger.error("{}: no UEB servers configured for sink {}", this, topic); + continue; + } + + List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + + String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + + String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + + String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); + + String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); + + String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); + int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH; + if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { + try { + fetchTimeout = Integer.parseInt(fetchTimeoutString); + } catch (NumberFormatException nfe) { + logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, + topic); + } + } + + String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); + int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH; + if (fetchLimitString != null && !fetchLimitString.isEmpty()) { + try { + fetchLimit = Integer.parseInt(fetchLimitString); + } catch (NumberFormatException nfe) { + logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, + topic); + } + } + + String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + // default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()) { + useHttps = Boolean.parseBoolean(useHttpsString); + } + + String allowSelfSignedCertsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + // default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + + UebTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, consumerGroup, + consumerInstance, fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts); + newUebTopicSources.add(uebTopicSource); + } + } + return newUebTopicSources; + } + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) { + + return this.build(servers, topic, apiKey, apiSecret, null, null, UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH, + UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true); + } + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSource build(List<String> servers, String topic) { + return this.build(servers, topic, null, null); + } + + /** + * {@inheritDoc} + */ + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + UebTopicSource uebTopicSource; + + synchronized (this) { + if (!uebTopicSources.containsKey(topic)) { + return; + } + + uebTopicSource = uebTopicSources.remove(topic); + } + + uebTopicSource.shutdown(); + } + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSource get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (uebTopicSources.containsKey(topic)) { + return uebTopicSources.get(topic); + } else { + throw new IllegalStateException("UebTopiceSource for " + topic + " not found"); + } + } + } + + @Override + public synchronized List<UebTopicSource> inventory() { + return new ArrayList<>(this.uebTopicSources.values()); + } + + @Override + public void destroy() { + List<UebTopicSource> readers = this.inventory(); + for (UebTopicSource reader : readers) { + reader.shutdown(); + } + + synchronized (this) { + this.uebTopicSources.clear(); + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("IndexedUebTopicSourceFactory []"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java new file mode 100644 index 00000000..8cc631c8 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -0,0 +1,45 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import java.io.IOException; + +/** + * Wrapper around libraries to consume from message bus + * + */ +public interface BusConsumer { + + /** + * fetch messages + * + * @return list of messages + * @throws Exception when error encountered by underlying libraries + */ + public Iterable<String> fetch() throws InterruptedException, IOException; + + /** + * close underlying library consumer + */ + public void close(); +} + + diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java new file mode 100644 index 00000000..f6c65ada --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +public interface BusPublisher { + + /** + * sends a message + * + * @param partition id + * @param message the message + * @return true if success, false otherwise + * @throws IllegalArgumentException if no message provided + */ + public boolean send(String partitionId, String message); + + /** + * closes the publisher + */ + public void close(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java new file mode 100644 index 00000000..7f4c0ddd --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java @@ -0,0 +1,127 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import java.util.List; + +import org.onap.policy.common.endpoints.event.comm.bus.ApiKeyEnabled; + +/** + * Bus Topic Base + */ +public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { + + /** + * API Key + */ + protected String apiKey; + + /** + * API Secret + */ + protected String apiSecret; + + /** + * Use https + */ + protected boolean useHttps; + + /** + * allow self signed certificates + */ + protected boolean allowSelfSignedCerts; + + /** + * Instantiates a new Bus Topic Base + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow + * + * @return a Bus Topic Base + * @throws IllegalArgumentException if invalid parameters are present + */ + public BusTopicBase(List<String> servers, String topic, String apiKey, String apiSecret, boolean useHttps, + boolean allowSelfSignedCerts) { + + super(servers, topic); + + this.apiKey = apiKey; + this.apiSecret = apiSecret; + this.useHttps = useHttps; + this.allowSelfSignedCerts = allowSelfSignedCerts; + } + + @Override + public String getApiKey() { + return apiKey; + } + + @Override + public String getApiSecret() { + return apiSecret; + } + + /** + * @return if using https + */ + public boolean isUseHttps() { + return useHttps; + } + + /** + * @return if self signed certificates are allowed + */ + public boolean isAllowSelfSignedCerts() { + return allowSelfSignedCerts; + } + + protected boolean anyNullOrEmpty(String... args) { + for (String arg : args) { + if (arg == null || arg.isEmpty()) { + return true; + } + } + + return false; + } + + protected boolean allNullOrEmpty(String... args) { + for (String arg : args) { + if (!(arg == null || arg.isEmpty())) { + return false; + } + } + + return true; + } + + + @Override + public String toString() { + return "BusTopicBase [apiKey=" + apiKey + ", apiSecret=" + apiSecret + ", useHttps=" + useHttps + + ", allowSelfSignedCerts=" + allowSelfSignedCerts + ", toString()=" + super.toString() + "]"; + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/FilterableBusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/FilterableBusConsumer.java new file mode 100644 index 00000000..76adf980 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/FilterableBusConsumer.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +/** + * BusConsumer that supports server-side filtering. + */ +public interface FilterableBusConsumer extends BusConsumer { + + /** + * Sets the server-side filter. + * + * @param filter new filter value, or {@code null} + * @throws IllegalArgumentException if the consumer cannot be built with the new filter + */ + public void setFilter(String filter); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java new file mode 100644 index 00000000..f3c736da --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java @@ -0,0 +1,215 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import java.util.List; +import java.util.UUID; + +import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink + * regardless if it is UEB or DMaaP. + * + */ +public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink { + + /** + * loggers + */ + private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class); + private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER); + + /** + * The partition key to publish to + */ + protected String partitionId; + + /** + * message bus publisher + */ + protected BusPublisher publisher; + + /** + * constructor for abstract sink + * + * @param servers servers + * @param topic topic + * @param apiKey api secret + * @param apiSecret api secret + * @param partitionId partition id + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow + * @throws IllegalArgumentException in invalid parameters are passed in + */ + public InlineBusTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String partitionId, + boolean useHttps, boolean allowSelfSignedCerts) { + + super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts); + + if (partitionId == null || partitionId.isEmpty()) { + this.partitionId = UUID.randomUUID().toString(); + } + } + + /** + * Initialize the Bus publisher + */ + public abstract void init(); + + /** + * {@inheritDoc} + */ + @Override + public boolean start() { + logger.info("{}: starting", this); + + synchronized (this) { + + if (this.alive) { + return true; + } + + if (locked) { + throw new IllegalStateException(this + " is locked."); + } + + this.alive = true; + } + + this.init(); + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean stop() { + + BusPublisher publisherCopy; + synchronized (this) { + this.alive = false; + publisherCopy = this.publisher; + this.publisher = null; + } + + if (publisherCopy != null) { + try { + publisherCopy.close(); + } catch (Exception e) { + logger.warn("{}: cannot stop publisher because of {}", this, e.getMessage(), e); + } + } else { + logger.warn("{}: there is no publisher", this); + return false; + } + + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean send(String message) { + + if (message == null || message.isEmpty()) { + throw new IllegalArgumentException("Message to send is empty"); + } + + if (!this.alive) { + throw new IllegalStateException(this + " is stopped"); + } + + try { + synchronized (this) { + this.recentEvents.add(message); + } + + netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), + message); + + publisher.send(this.partitionId, message); + broadcast(message); + } catch (Exception e) { + logger.warn("{}: cannot send because of {}", this, e.getMessage(), e); + return false; + } + + return true; + } + + + /** + * {@inheritDoc} + */ + @Override + public void setPartitionKey(String partitionKey) { + this.partitionId = partitionKey; + } + + /** + * {@inheritDoc} + */ + @Override + public String getPartitionKey() { + return this.partitionId; + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() { + this.stop(); + } + + @Override + protected boolean anyNullOrEmpty(String... args) { + for (String arg : args) { + if (arg == null || arg.isEmpty()) { + return true; + } + } + + return false; + } + + @Override + protected boolean allNullOrEmpty(String... args) { + for (String arg : args) { + if (!(arg == null || arg.isEmpty())) { + return false; + } + } + + return true; + } + + + @Override + public String toString() { + return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]"; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java new file mode 100644 index 00000000..2ad8bb34 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -0,0 +1,133 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import java.util.List; +import java.util.Map; + +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaPublisherWrapper; +import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.DmaapDmePublisherWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This implementation publishes events for the associated DMAAP topic, inline with the calling + * thread. + */ +public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink { + + protected static Logger logger = LoggerFactory.getLogger(InlineDmaapTopicSink.class); + + protected final String userName; + protected final String password; + + protected String environment = null; + protected String aftEnvironment = null; + protected String partner = null; + protected String latitude = null; + protected String longitude = null; + + protected Map<String, String> additionalProps = null; + + /** + * + * @param servers DMaaP servers + * @param topic DMaaP Topic to be monitored + * @param apiKey DMaaP API Key (optional) + * @param apiSecret DMaaP API Secret (optional) + * @param consumerGroup DMaaP Reader Consumer Group + * @param consumerInstance DMaaP Reader Instance + * @param fetchTimeout DMaaP fetch timeout + * @param fetchLimit DMaaP fetch limit + * @param environment DME2 Environment + * @param aftEnvironment DME2 AFT Environment + * @param partner DME2 Partner + * @param latitude DME2 Latitude + * @param longitude DME2 Longitude + * @param additionalProps Additional properties to pass to DME2 + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow + * + * @throws IllegalArgumentException An invalid parameter passed in + */ + public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName, + String password, String partitionKey, String environment, String aftEnvironment, String partner, + String latitude, String longitude, Map<String, String> additionalProps, boolean useHttps, + boolean allowSelfSignedCerts) { + + super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts); + + this.userName = userName; + this.password = password; + + this.environment = environment; + this.aftEnvironment = aftEnvironment; + this.partner = partner; + + this.latitude = latitude; + this.longitude = longitude; + + this.additionalProps = additionalProps; + } + + public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName, + String password, String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) { + + super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts); + + this.userName = userName; + this.password = password; + } + + + @Override + public void init() { + if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { + this.publisher = new CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, + this.userName, this.password, this.useHttps); + } else { + this.publisher = new DmaapDmePublisherWrapper(this.servers, this.topic, this.userName, this.password, + this.environment, this.aftEnvironment, this.partner, this.latitude, this.longitude, + this.additionalProps, this.useHttps); + } + + logger.info("{}: DMAAP SINK created", this); + } + + /** + * {@inheritDoc} + */ + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.DMAAP; + } + + + @Override + public String toString() { + return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password + + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + super.toString() + + "]"; + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java new file mode 100644 index 00000000..24cf6073 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java @@ -0,0 +1,86 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import java.util.List; + +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaPublisherWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This implementation publishes events for the associated UEB topic, inline with the calling + * thread. + */ +public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink { + + /** + * logger + */ + private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class); + + /** + * Argument-based UEB Topic Writer instantiation + * + * @param servers list of UEB servers available for publishing + * @param topic the topic to publish to + * @param apiKey the api key (optional) + * @param apiSecret the api secret (optional) + * @param partitionId the partition key (optional, autogenerated if not provided) + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow + * + * @throws IllegalArgumentException if invalid arguments are detected + */ + public InlineUebTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String partitionId, + boolean useHttps, boolean allowSelfSignedCerts) { + super(servers, topic, apiKey, apiSecret, partitionId, useHttps, allowSelfSignedCerts); + } + + /** + * Instantiation of internal resources + */ + @Override + public void init() { + + this.publisher = + new CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, this.useHttps); + logger.info("{}: UEB SINK created", this); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("InlineUebTopicSink [getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()) + .append(", toString()=").append(super.toString()).append("]"); + return builder.toString(); + } + + /** + * {@inheritDoc} + */ + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.UEB; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java new file mode 100644 index 00000000..d59c63f5 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -0,0 +1,324 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import java.net.MalformedURLException; +import java.util.List; +import java.util.UUID; + +import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource; +import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource; +import org.onap.policy.common.utils.network.NetworkUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This topic source implementation specializes in reading messages over a bus topic source and + * notifying its listeners + */ +public abstract class SingleThreadedBusTopicSource extends BusTopicBase + implements Runnable, BusTopicSource, FilterableTopicSource { + + /** + * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only + * that in a single file in a concise format. + */ + private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class); + private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER); + + /** + * Bus consumer group + */ + protected final String consumerGroup; + + /** + * Bus consumer instance + */ + protected final String consumerInstance; + + /** + * Bus fetch timeout + */ + protected final int fetchTimeout; + + /** + * Bus fetch limit + */ + protected final int fetchLimit; + + /** + * Message Bus Consumer + */ + protected BusConsumer consumer; + + /** + * Independent thread reading message over my topic + */ + protected Thread busPollerThread; + + + /** + * + * @param servers Bus servers + * @param topic Bus Topic to be monitored + * @param apiKey Bus API Key (optional) + * @param apiSecret Bus API Secret (optional) + * @param consumerGroup Bus Reader Consumer Group + * @param consumerInstance Bus Reader Instance + * @param fetchTimeout Bus fetch timeout + * @param fetchLimit Bus fetch limit + * @param useHttps does the bus use https + * @param allowSelfSignedCerts are self-signed certificates allowed + * @throws IllegalArgumentException An invalid parameter passed in + */ + public SingleThreadedBusTopicSource(List<String> servers, String topic, String apiKey, String apiSecret, + String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps, + boolean allowSelfSignedCerts) { + + super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts); + + if (consumerGroup == null || consumerGroup.isEmpty()) { + this.consumerGroup = UUID.randomUUID().toString(); + } else { + this.consumerGroup = consumerGroup; + } + + if (consumerInstance == null || consumerInstance.isEmpty()) { + this.consumerInstance = NetworkUtil.getHostname(); + } else { + this.consumerInstance = consumerInstance; + } + + if (fetchTimeout <= 0) { + this.fetchTimeout = NO_TIMEOUT_MS_FETCH; + } else { + this.fetchTimeout = fetchTimeout; + } + + if (fetchLimit <= 0) { + this.fetchLimit = NO_LIMIT_FETCH; + } else { + this.fetchLimit = fetchLimit; + } + + } + + /** + * Initialize the Bus client + */ + public abstract void init() throws MalformedURLException; + + @Override + public void register(TopicListener topicListener) { + + super.register(topicListener); + + try { + if (!alive && !locked) { + this.start(); + } else { + logger.info("{}: register: start not attempted", this); + } + } catch (Exception e) { + logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(), + e); + } + } + + @Override + public void unregister(TopicListener topicListener) { + boolean stop; + synchronized (this) { + super.unregister(topicListener); + stop = this.topicListeners.isEmpty(); + } + + if (stop) { + this.stop(); + } + } + + @Override + public boolean start() { + logger.info("{}: starting", this); + + synchronized (this) { + + if (alive) { + return true; + } + + if (locked) { + throw new IllegalStateException(this + " is locked."); + } + + if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) { + + try { + this.init(); + this.alive = true; + this.busPollerThread = new Thread(this); + this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic()); + busPollerThread.start(); + } catch (Exception e) { + logger.warn("{}: cannot start because of {}", this, e.getMessage(), e); + throw new IllegalStateException(e); + } + } + } + + return this.alive; + } + + @Override + public boolean stop() { + logger.info("{}: stopping", this); + + synchronized (this) { + BusConsumer consumerCopy = this.consumer; + + this.alive = false; + this.consumer = null; + + if (consumerCopy != null) { + try { + consumerCopy.close(); + } catch (Exception e) { + logger.warn("{}: stop failed because of {}", this, e.getMessage(), e); + } + } + } + + Thread.yield(); + + return true; + } + + /** + * Run thread method for the Bus Reader + */ + @Override + public void run() { + while (this.alive) { + try { + for (String event : this.consumer.fetch()) { + synchronized (this) { + this.recentEvents.add(event); + } + + netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, + System.lineSeparator(), event); + + broadcast(event); + + if (!this.alive) { + break; + } + } + } catch (Exception e) { + logger.error("{}: cannot fetch because of ", this, e.getMessage(), e); + } + } + + logger.info("{}: exiting thread", this); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean offer(String event) { + if (!this.alive) { + throw new IllegalStateException(this + " is not alive."); + } + + synchronized (this) { + this.recentEvents.add(event); + } + + netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), event); + + + return broadcast(event); + } + + + @Override + public void setFilter(String filter) { + if (consumer instanceof FilterableBusConsumer) { + ((FilterableBusConsumer) consumer).setFilter(filter); + + } else { + throw new UnsupportedOperationException("no server-side filtering for topic " + topic); + } + } + + @Override + public String toString() { + return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance + + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer + + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners=" + + topicListeners.size() + ", toString()=" + super.toString() + "]"; + } + + /** + * {@inheritDoc} + */ + @Override + public String getConsumerGroup() { + return consumerGroup; + } + + /** + * {@inheritDoc} + */ + @Override + public String getConsumerInstance() { + return consumerInstance; + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() { + this.stop(); + this.topicListeners.clear(); + } + + /** + * {@inheritDoc} + */ + @Override + public int getFetchTimeout() { + return fetchTimeout; + } + + /** + * {@inheritDoc} + */ + @Override + public int getFetchLimit() { + return fetchLimit; + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java new file mode 100644 index 00000000..de5fed6a --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -0,0 +1,179 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import java.net.MalformedURLException; +import java.util.List; +import java.util.Map; + +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaConsumerWrapper; +import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.DmaapDmeConsumerWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This topic reader implementation specializes in reading messages over DMAAP topic and notifying + * its listeners + */ +public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable { + + private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class); + + + protected final String userName; + protected final String password; + + protected String environment = null; + protected String aftEnvironment = null; + protected String partner = null; + protected String latitude = null; + protected String longitude = null; + + protected Map<String, String> additionalProps = null; + + + /** + * + * @param servers DMaaP servers + * @param topic DMaaP Topic to be monitored + * @param apiKey DMaaP API Key (optional) + * @param apiSecret DMaaP API Secret (optional) + * @param consumerGroup DMaaP Reader Consumer Group + * @param consumerInstance DMaaP Reader Instance + * @param fetchTimeout DMaaP fetch timeout + * @param fetchLimit DMaaP fetch limit + * @param environment DME2 Environment + * @param aftEnvironment DME2 AFT Environment + * @param partner DME2 Partner + * @param latitude DME2 Latitude + * @param longitude DME2 Longitude + * @param additionalProps Additional properties to pass to DME2 + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow + * + * @throws IllegalArgumentException An invalid parameter passed in + */ + public SingleThreadedDmaapTopicSource(List<String> servers, String topic, String apiKey, String apiSecret, + String userName, String password, String consumerGroup, String consumerInstance, int fetchTimeout, + int fetchLimit, String environment, String aftEnvironment, String partner, String latitude, + String longitude, Map<String, String> additionalProps, boolean useHttps, boolean allowSelfSignedCerts) { + + super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, + allowSelfSignedCerts); + + this.userName = userName; + this.password = password; + + this.environment = environment; + this.aftEnvironment = aftEnvironment; + this.partner = partner; + + this.latitude = latitude; + this.longitude = longitude; + + this.additionalProps = additionalProps; + try { + this.init(); + } catch (Exception e) { + logger.error("ERROR during init of topic {}", this.topic); + throw new IllegalArgumentException(e); + } + } + + /** + * + * @param servers DMaaP servers + * @param topic DMaaP Topic to be monitored + * @param apiKey DMaaP API Key (optional) + * @param apiSecret DMaaP API Secret (optional) + * @param consumerGroup DMaaP Reader Consumer Group + * @param consumerInstance DMaaP Reader Instance + * @param fetchTimeout DMaaP fetch timeout + * @param fetchLimit DMaaP fetch limit + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow + * @throws IllegalArgumentException An invalid parameter passed in + */ + public SingleThreadedDmaapTopicSource(List<String> servers, String topic, String apiKey, String apiSecret, + String userName, String password, String consumerGroup, String consumerInstance, int fetchTimeout, + int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) { + + + super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, + allowSelfSignedCerts); + + this.userName = userName; + this.password = password; + + try { + this.init(); + } catch (Exception e) { + logger.warn("dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e); + throw new IllegalArgumentException(e); + } + } + + + /** + * Initialize the Cambria or MR Client + */ + @Override + public void init() throws MalformedURLException { + if (anyNullOrEmpty(this.userName, this.password)) { + this.consumer = new CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, + this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, this.useHttps, + this.allowSelfSignedCerts); + } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { + this.consumer = new CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, + this.userName, this.password, this.consumerGroup, this.consumerInstance, this.fetchTimeout, + this.fetchLimit, this.useHttps, this.allowSelfSignedCerts); + } else { + this.consumer = new DmaapDmeConsumerWrapper(this.servers, this.topic, this.apiKey, + this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance, + this.fetchTimeout, this.fetchLimit, this.environment, this.aftEnvironment, this.partner, + this.latitude, this.longitude, this.additionalProps, this.useHttps); + } + + logger.info("{}: INITTED", this); + } + + /** + * {@inheritDoc} + */ + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.DMAAP; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=") + .append((password == null || password.isEmpty()) ? "-" : password.length()) + .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=") + .append(super.toString()).append("]"); + return builder.toString(); + } + + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java new file mode 100644 index 00000000..00e45422 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java @@ -0,0 +1,91 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import java.util.List; + +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaConsumerWrapper; + +/** + * This topic source implementation specializes in reading messages over an UEB Bus topic source and + * notifying its listeners + */ +public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource { + + /** + * + * @param servers UEB servers + * @param topic UEB Topic to be monitored + * @param apiKey UEB API Key (optional) + * @param apiSecret UEB API Secret (optional) + * @param consumerGroup UEB Reader Consumer Group + * @param consumerInstance UEB Reader Instance + * @param fetchTimeout UEB fetch timeout + * @param fetchLimit UEB fetch limit + * @param useHttps does topicSource use HTTPS? + * @param allowSelfSignedCerts does topicSource allow self-signed certs? + * + * @throws IllegalArgumentException An invalid parameter passed in + */ + + + public SingleThreadedUebTopicSource(List<String> servers, String topic, String apiKey, String apiSecret, + String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps, + boolean allowSelfSignedCerts) { + + super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, + allowSelfSignedCerts); + + this.allowSelfSignedCerts = allowSelfSignedCerts; + + this.init(); + } + + /** + * Initialize the Cambria client + */ + @Override + public void init() { + this.consumer = new CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, + this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, this.useHttps, + this.allowSelfSignedCerts); + } + + /** + * {@inheritDoc} + */ + @Override + public CommInfrastructure getTopicCommInfrastructure() { + return Topic.CommInfrastructure.UEB; + } + + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("SingleThreadedUebTopicSource [getTopicCommInfrastructure()=") + .append(getTopicCommInfrastructure()).append(", toString()=").append(super.toString()).append("]"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java new file mode 100644 index 00000000..ed15ddf7 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java @@ -0,0 +1,229 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.collections4.queue.CircularFifoQueue; +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class TopicBase implements Topic { + + /** + * logger + */ + private static Logger logger = LoggerFactory.getLogger(TopicBase.class); + + /** + * list of servers + */ + protected List<String> servers; + + /** + * Topic + */ + protected String topic; + + /** + * event cache + */ + protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10); + + /** + * Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() => + * !alive + */ + protected volatile boolean alive = false; + + /** + * Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in + * the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is + * obvious since locked => !alive) + */ + protected volatile boolean locked = false; + + /** + * All my subscribers for new message notifications + */ + protected final ArrayList<TopicListener> topicListeners = new ArrayList<>(); + + /** + * Instantiates a new Topic Base + * + * @param servers list of servers + * @param topic topic name + * + * @return a Topic Base + * @throws IllegalArgumentException if invalid parameters are present + */ + public TopicBase(List<String> servers, String topic) { + + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("Server(s) must be provided"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A Topic must be provided"); + } + + this.servers = servers; + this.topic = topic; + } + + @Override + public void register(TopicListener topicListener) { + + logger.info("{}: registering {}", this, topicListener); + + synchronized (this) { + if (topicListener == null) { + throw new IllegalArgumentException("TopicListener must be provided"); + } + + for (TopicListener listener : this.topicListeners) { + if (listener == topicListener) { + return; + } + } + + this.topicListeners.add(topicListener); + } + } + + @Override + public void unregister(TopicListener topicListener) { + + logger.info("{}: unregistering {}", this, topicListener); + + synchronized (this) { + if (topicListener == null) { + throw new IllegalArgumentException("TopicListener must be provided"); + } + + this.topicListeners.remove(topicListener); + } + } + + /** + * broadcast event to all listeners + * + * @param message the event + * @return true if all notifications are performed with no error, false otherwise + */ + protected boolean broadcast(String message) { + List<TopicListener> snapshotListeners = this.snapshotTopicListeners(); + + boolean success = true; + for (TopicListener topicListener : snapshotListeners) { + try { + topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message); + } catch (Exception e) { + logger.warn("{}: notification error @ {} because of {}", this, topicListener, e.getMessage(), e); + success = false; + } + } + return success; + } + + /** + * take a snapshot of current topic listeners + * + * @return the topic listeners + */ + protected synchronized List<TopicListener> snapshotTopicListeners() { + @SuppressWarnings("unchecked") + List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone(); + return listeners; + } + + @Override + public boolean lock() { + + logger.info("{}: locking", this); + + synchronized (this) { + if (this.locked) { + return true; + } + + this.locked = true; + } + + return this.stop(); + } + + @Override + public boolean unlock() { + logger.info("{}: unlocking", this); + + synchronized (this) { + if (!this.locked) { + return true; + } + + this.locked = false; + } + + try { + return this.start(); + } catch (Exception e) { + logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e); + return false; + } + } + + @Override + public boolean isLocked() { + return this.locked; + } + + @Override + public String getTopic() { + return topic; + } + + @Override + public boolean isAlive() { + return this.alive; + } + + @Override + public List<String> getServers() { + return servers; + } + + @Override + public synchronized String[] getRecentEvents() { + String[] events = new String[recentEvents.size()]; + return recentEvents.toArray(events); + } + + + @Override + public String toString() { + return "TopicBase [servers=" + servers + ", topic=" + topic + ", #recentEvents=" + recentEvents.size() + + ", locked=" + locked + ", #topicListeners=" + topicListeners.size() + "]"; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaConsumerWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaConsumerWrapper.java new file mode 100644 index 00000000..fedde284 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaConsumerWrapper.java @@ -0,0 +1,210 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; + +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; +import com.att.nsa.cambria.client.CambriaConsumer; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.List; + +import org.onap.policy.common.endpoints.event.comm.bus.internal.FilterableBusConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cambria based consumer + */ +public class CambriaConsumerWrapper implements FilterableBusConsumer { + + /** + * logger + */ + private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); + + /** + * Used to build the consumer. + */ + private final ConsumerBuilder builder; + + /** + * Locked while updating {@link #consumer} and {@link #newConsumer}. + */ + private final Object consLocker = new Object(); + + /** + * Cambria client + */ + private CambriaConsumer consumer; + + /** + * Cambria client to use for next fetch + */ + private CambriaConsumer newConsumer = null; + + /** + * fetch timeout + */ + protected int fetchTimeout; + + /** + * close condition + */ + protected Object closeCondition = new Object(); + + /** + * Cambria Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws GeneralSecurityException + * @throws MalformedURLException + */ + public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, + String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps, + boolean useSelfSignedCerts) { + this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, + useHttps, useSelfSignedCerts); + } + + public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String username, + String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, + boolean useHttps, boolean useSelfSignedCerts) { + + this.fetchTimeout = fetchTimeout; + + this.builder = new CambriaClientBuilders.ConsumerBuilder(); + + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic).waitAtServer(fetchTimeout) + .receivingAtMost(fetchLimit); + + // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) + builder.withSocketTimeout(fetchTimeout + 30000); + + if (useHttps) { + builder.usingHttps(); + + if (useSelfSignedCerts) { + builder.allowSelfSignedCertificates(); + } + } + + if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { + builder.authenticatedBy(apiKey, apiSecret); + } + + if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { + builder.authenticatedByHttp(username, password); + } + + try { + this.consumer = builder.build(); + } catch (MalformedURLException | GeneralSecurityException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public Iterable<String> fetch() throws IOException, InterruptedException { + try { + return getCurrentConsumer().fetch(); + } catch (final IOException e) { + logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), + this.fetchTimeout); + synchronized (this.closeCondition) { + this.closeCondition.wait(this.fetchTimeout); + } + + throw e; + } + } + + @Override + public void close() { + synchronized (closeCondition) { + closeCondition.notifyAll(); + } + + getCurrentConsumer().close(); + } + + private CambriaConsumer getCurrentConsumer() { + CambriaConsumer old = null; + CambriaConsumer ret; + + synchronized (consLocker) { + if (this.newConsumer != null) { + // replace old consumer with new consumer + old = this.consumer; + this.consumer = this.newConsumer; + this.newConsumer = null; + } + + ret = this.consumer; + } + + if (old != null) { + old.close(); + } + + return ret; + } + + @Override + public void setFilter(String filter) { + logger.info("{}: setting DMAAP server-side filter: {}", this, filter); + builder.withServerSideFilter(filter); + + try { + CambriaConsumer previous; + synchronized (consLocker) { + previous = this.newConsumer; + this.newConsumer = builder.build(); + } + + if (previous != null) { + // there was already a new consumer - close it + previous.close(); + } + + } catch (MalformedURLException | GeneralSecurityException e) { + /* + * Since an exception occurred, "consumer" still has its old value, thus it should not + * be closed at this point. + */ + throw new IllegalArgumentException(e); + } + } + + @Override + public String toString() { + return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaPublisherWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaPublisherWrapper.java new file mode 100644 index 00000000..ab5868ab --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaPublisherWrapper.java @@ -0,0 +1,122 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.List; + +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cambria based library publisher + */ +public class CambriaPublisherWrapper implements BusPublisher { + + private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class); + + /** + * The actual Cambria publisher + */ + @JsonIgnore + protected volatile CambriaBatchingPublisher publisher; + + public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret, + boolean useHttps) { + this(servers, topic, apiKey, apiSecret, null, null, useHttps); + } + + public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String username, + String password, boolean useHttps) { + + PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); + + builder.usingHosts(servers).onTopic(topic); + + // Set read timeout to 30 seconds (TBD: this should be configurable) + builder.withSocketTimeout(30000); + + if (useHttps) { + builder.usingHttps(); + } + + + if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { + builder.authenticatedBy(apiKey, apiSecret); + } + + if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { + builder.authenticatedByHttp(username, password); + } + + try { + this.publisher = builder.build(); + } catch (MalformedURLException | GeneralSecurityException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean send(String partitionId, String message) { + if (message == null) { + throw new IllegalArgumentException("No message provided"); + } + + try { + this.publisher.send(partitionId, message); + } catch (Exception e) { + logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e); + return false; + } + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + logger.info("{}: CLOSE", this); + + try { + this.publisher.close(); + } catch (Exception e) { + logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); + } + } + + + @Override + public String toString() { + return "CambriaPublisherWrapper []"; + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafConsumerWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafConsumerWrapper.java new file mode 100644 index 00000000..29fbf1d5 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafConsumerWrapper.java @@ -0,0 +1,95 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; + +import com.att.nsa.mr.client.impl.MRConsumerImpl; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; + +import java.net.MalformedURLException; +import java.util.List; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MR based consumer + */ +public class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { + + private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); + + private final Properties props; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param aafLogin AAF Login + * @param aafPassword AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws MalformedURLException + */ + public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String aafLogin, + String aafPassword, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, + boolean useHttps) throws MalformedURLException { + + super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, consumerInstance, fetchTimeout, + fetchLimit); + + // super constructor sets servers = {""} if empty to avoid errors when using DME2 + if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) || (servers.isEmpty())) { + throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); + } + + this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + + props = new Properties(); + + if (useHttps) { + props.setProperty(PROTOCOL_PROP, "https"); + this.consumer.setHost(servers.get(0) + ":3905"); + + } else { + props.setProperty(PROTOCOL_PROP, "http"); + this.consumer.setHost(servers.get(0) + ":3904"); + } + + this.consumer.setProps(props); + logger.info("{}: CREATION", this); + } + + @Override + public String toString() { + final MRConsumerImpl consumer = this.consumer; + + return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() + + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() + + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" + + consumer.getUsername() + "]"; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafPublisherWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafPublisherWrapper.java new file mode 100644 index 00000000..1308bb36 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafPublisherWrapper.java @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; + +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; + +import java.util.List; + +/** + * DmaapClient library wrapper + */ +public class DmaapAafPublisherWrapper extends DmaapPublisherWrapper { + /** + * MR based Publisher + */ + public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword, + boolean useHttps) { + + super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps); + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapConsumerWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapConsumerWrapper.java new file mode 100644 index 00000000..0806c3d3 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapConsumerWrapper.java @@ -0,0 +1,146 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; + +import com.att.nsa.mr.client.impl.MRConsumerImpl; +import com.att.nsa.mr.client.response.MRConsumerResponse; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.List; + +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MR based consumer + */ +public abstract class DmaapConsumerWrapper implements BusConsumer { + + /** + * logger + */ + private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); + + /** + * Name of the "protocol" property. + */ + protected static final String PROTOCOL_PROP = "Protocol"; + + /** + * fetch timeout + */ + protected int fetchTimeout; + + /** + * close condition + */ + protected Object closeCondition = new Object(); + + /** + * MR Consumer + */ + protected MRConsumerImpl consumer; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param username AAF Login + * @param password AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws MalformedURLException + */ + public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String username, + String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit) + throws MalformedURLException { + + this.fetchTimeout = fetchTimeout; + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("No topic for DMaaP"); + } + + this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, + null, apiKey, apiSecret); + + this.consumer.setUsername(username); + this.consumer.setPassword(password); + } + + @Override + public Iterable<String> fetch() throws InterruptedException, IOException { + final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); + if (response == null) { + logger.warn("{}: DMaaP NULL response received", this); + + synchronized (closeCondition) { + closeCondition.wait(fetchTimeout); + } + return new ArrayList<>(); + } else { + logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(), response.getResponseMessage()); + + if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) { + + logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), + response.getResponseMessage()); + + synchronized (closeCondition) { + closeCondition.wait(fetchTimeout); + } + + /* fall through */ + } + } + + if (response.getActualMessages() == null) { + return new ArrayList<>(); + } else { + return response.getActualMessages(); + } + } + + @Override + public void close() { + synchronized (closeCondition) { + closeCondition.notifyAll(); + } + + this.consumer.close(); + } + + @Override + public String toString() { + return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() + + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() + + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" + + consumer.getUsername() + "]"; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmeConsumerWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmeConsumerWrapper.java new file mode 100644 index 00000000..fd998e2b --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmeConsumerWrapper.java @@ -0,0 +1,144 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; + +import java.net.MalformedURLException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { + + private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); + + private final Properties props; + + public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, + String dme2Login, String dme2Password, String consumerGroup, String consumerInstance, int fetchTimeout, + int fetchLimit, String environment, String aftEnvironment, String dme2Partner, String latitude, + String longitude, Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException { + + + + super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, consumerInstance, fetchTimeout, + fetchLimit); + + + final String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + + if (environment == null || environment.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + } + if (aftEnvironment == null || aftEnvironment.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + } + if (latitude == null || latitude.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + } + if (longitude == null || longitude.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + } + + if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { + throw new IllegalArgumentException( + "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + } + + final String serviceName = servers.get(0); + + this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); + + this.consumer.setUsername(dme2Login); + this.consumer.setPassword(dme2Password); + + props = new Properties(); + + props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); + + props.setProperty("username", dme2Login); + props.setProperty("password", dme2Password); + + /* These are required, no defaults */ + props.setProperty("topic", topic); + + props.setProperty("Environment", environment); + props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + + if (dme2Partner != null) { + props.setProperty("Partner", dme2Partner); + } + if (dme2RouteOffer != null) { + props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + } + + props.setProperty("Latitude", latitude); + props.setProperty("Longitude", longitude); + + /* These are optional, will default to these values if not set in additionalProps */ + props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); + props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); + props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); + props.setProperty("Version", "1.0"); + props.setProperty("SubContextPath", "/"); + props.setProperty("sessionstickinessrequired", "no"); + + /* These should not change */ + props.setProperty("TransportType", "DME2"); + props.setProperty("MethodType", "GET"); + + if (useHttps) { + props.setProperty(PROTOCOL_PROP, "https"); + + } else { + props.setProperty(PROTOCOL_PROP, "http"); + } + + props.setProperty("contenttype", "application/json"); + + if (additionalProps != null) { + for (Map.Entry<String, String> entry : additionalProps.entrySet()) { + props.put(entry.getKey(), entry.getValue()); + } + } + + MRClientFactory.prop = props; + this.consumer.setProps(props); + + logger.info("{}: CREATION", this); + } + + private IllegalArgumentException parmException(String topic, String propnm) { + return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + propnm + " property for DME2 in DMaaP"); + + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmePublisherWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmePublisherWrapper.java new file mode 100644 index 00000000..e39954ed --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmePublisherWrapper.java @@ -0,0 +1,112 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; + +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; + +import java.util.List; +import java.util.Map; + +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; + +public class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { + public DmaapDmePublisherWrapper(List<String> servers, String topic, String username, String password, + String environment, String aftEnvironment, String dme2Partner, String latitude, String longitude, + Map<String, String> additionalProps, boolean useHttps) { + + super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps); + + + + String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + + if (environment == null || environment.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + } + if (aftEnvironment == null || aftEnvironment.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + } + if (latitude == null || latitude.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + } + if (longitude == null || longitude.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + } + + if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { + throw new IllegalArgumentException( + "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + } + + String serviceName = servers.get(0); + + /* These are required, no defaults */ + props.setProperty("Environment", environment); + props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + + props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); + + if (dme2Partner != null) { + props.setProperty("Partner", dme2Partner); + } + if (dme2RouteOffer != null) { + props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + } + + props.setProperty("Latitude", latitude); + props.setProperty("Longitude", longitude); + + // ServiceName also a default, found in additionalProps + + /* These are optional, will default to these values if not set in optionalProps */ + props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); + props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); + props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); + props.setProperty("Version", "1.0"); + props.setProperty("SubContextPath", "/"); + props.setProperty("sessionstickinessrequired", "no"); + + /* These should not change */ + props.setProperty("TransportType", "DME2"); + props.setProperty("MethodType", "POST"); + + for (Map.Entry<String, String> entry : additionalProps.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + + if (value != null) { + props.setProperty(key, value); + } + } + + this.publisher.setProps(props); + } + + private IllegalArgumentException parmException(String topic, String propnm) { + return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + propnm + " property for DME2 in DMaaP"); + + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapPublisherWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapPublisherWrapper.java new file mode 100644 index 00000000..396580e9 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapPublisherWrapper.java @@ -0,0 +1,164 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; + +import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; +import com.att.nsa.mr.client.response.MRPublisherResponse; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DmaapClient library wrapper + */ +public abstract class DmaapPublisherWrapper implements BusPublisher { + + private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class); + + /** + * MR based Publisher + */ + protected MRSimplerBatchPublisher publisher; + protected Properties props; + + /** + * MR Publisher Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param username AAF or DME2 Login + * @param password AAF or DME2 Password + */ + public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic, String username, + String password, boolean useHttps) { + + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("No topic for DMaaP"); + } + + + if (protocol == ProtocolTypeConstants.AAF_AUTH) { + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided"); + } + + ArrayList<String> dmaapServers = new ArrayList<>(); + if (useHttps) { + for (String server : servers) { + dmaapServers.add(server + ":3905"); + } + + } else { + for (String server : servers) { + dmaapServers.add(server + ":3904"); + } + } + + + this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); + + this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + } else if (protocol == ProtocolTypeConstants.DME2) { + ArrayList<String> dmaapServers = new ArrayList<>(); + dmaapServers.add("0.0.0.0:3904"); + + this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); + + this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); + } + + this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); + + this.publisher.setUsername(username); + this.publisher.setPassword(password); + + props = new Properties(); + + if (useHttps) { + props.setProperty("Protocol", "https"); + } else { + props.setProperty("Protocol", "http"); + } + + props.setProperty("contenttype", "application/json"); + props.setProperty("username", username); + props.setProperty("password", password); + + props.setProperty("topic", topic); + + this.publisher.setProps(props); + + if (protocol == ProtocolTypeConstants.AAF_AUTH) { + this.publisher.setHost(servers.get(0)); + } + + logger.info("{}: CREATION: using protocol {}", this, protocol.getValue()); + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + logger.info("{}: CLOSE", this); + + try { + this.publisher.close(1, TimeUnit.SECONDS); + } catch (Exception e) { + logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean send(String partitionId, String message) { + if (message == null) { + throw new IllegalArgumentException("No message provided"); + } + + this.publisher.setPubResponse(new MRPublisherResponse()); + this.publisher.send(partitionId, message); + MRPublisherResponse response = this.publisher.sendBatchWithResponse(); + if (response != null) { + logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(), response.getResponseMessage()); + } + + return true; + } + + @Override + public String toString() { + return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate() + + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()=" + publisher.getHost() + + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag() + ", publisher.getUsername()=" + + publisher.getUsername() + "]"; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/impl/ProxyTopicEndpointManager.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/impl/ProxyTopicEndpointManager.java new file mode 100644 index 00000000..779ab74d --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/impl/ProxyTopicEndpointManager.java @@ -0,0 +1,482 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.impl; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.onap.policy.common.capabilities.Startable; +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedDmaapTopicSinkFactory; +import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedDmaapTopicSourceFactory; +import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedNoopTopicSinkFactory; +import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedUebTopicSinkFactory; +import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedUebTopicSourceFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This implementation of the Topic Endpoint Manager, proxies operations to appropriate + * implementations according to the communication infrastructure that are supported + */ +public class ProxyTopicEndpointManager implements TopicEndpoint { + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); + /** + * Is this element locked? + */ + protected volatile boolean locked = false; + + /** + * Is this element alive? + */ + protected volatile boolean alive = false; + + /** + * singleton for global access + */ + private static final TopicEndpoint manager = new ProxyTopicEndpointManager(); + + /** + * Get the singelton instance. + * + * @return the instance + */ + public static TopicEndpoint getInstance() { + return manager; + } + + @Override + public List<TopicSource> addTopicSources(Properties properties) { + + // 1. Create UEB Sources + // 2. Create DMAAP Sources + + final List<TopicSource> sources = new ArrayList<>(); + + sources.addAll(IndexedUebTopicSourceFactory.getInstance().build(properties)); + sources.addAll(IndexedDmaapTopicSourceFactory.getInstance().build(properties)); + + if (this.isLocked()) { + for (final TopicSource source : sources) { + source.lock(); + } + } + + return sources; + } + + @Override + public List<TopicSink> addTopicSinks(Properties properties) { + // 1. Create UEB Sinks + // 2. Create DMAAP Sinks + + final List<TopicSink> sinks = new ArrayList<>(); + + sinks.addAll(IndexedUebTopicSinkFactory.getInstance().build(properties)); + sinks.addAll(IndexedDmaapTopicSinkFactory.getInstance().build(properties)); + sinks.addAll(IndexedNoopTopicSinkFactory.getInstance().build(properties)); + + if (this.isLocked()) { + for (final TopicSink sink : sinks) { + sink.lock(); + } + } + + return sinks; + } + + @Override + public List<TopicSource> getTopicSources() { + + final List<TopicSource> sources = new ArrayList<>(); + + sources.addAll(IndexedUebTopicSourceFactory.getInstance().inventory()); + sources.addAll(IndexedDmaapTopicSourceFactory.getInstance().inventory()); + + return sources; + } + + @Override + public List<TopicSink> getTopicSinks() { + + final List<TopicSink> sinks = new ArrayList<>(); + + sinks.addAll(IndexedUebTopicSinkFactory.getInstance().inventory()); + sinks.addAll(IndexedDmaapTopicSinkFactory.getInstance().inventory()); + sinks.addAll(IndexedNoopTopicSinkFactory.getInstance().inventory()); + + return sinks; + } + + @JsonIgnore + @Override + public List<UebTopicSource> getUebTopicSources() { + return IndexedUebTopicSourceFactory.getInstance().inventory(); + } + + @JsonIgnore + @Override + public List<DmaapTopicSource> getDmaapTopicSources() { + return IndexedDmaapTopicSourceFactory.getInstance().inventory(); + } + + @JsonIgnore + @Override + public List<UebTopicSink> getUebTopicSinks() { + return IndexedUebTopicSinkFactory.getInstance().inventory(); + } + + @JsonIgnore + @Override + public List<DmaapTopicSink> getDmaapTopicSinks() { + return IndexedDmaapTopicSinkFactory.getInstance().inventory(); + } + + @JsonIgnore + @Override + public List<NoopTopicSink> getNoopTopicSinks() { + return IndexedNoopTopicSinkFactory.getInstance().inventory(); + } + + @Override + public boolean start() { + + synchronized (this) { + if (this.locked) { + throw new IllegalStateException(this + " is locked"); + } + + if (this.alive) { + return true; + } + + this.alive = true; + } + + final List<Startable> endpoints = this.getEndpoints(); + + boolean success = true; + for (final Startable endpoint : endpoints) { + try { + success = endpoint.start() && success; + } catch (final Exception e) { + success = false; + logger.error("Problem starting endpoint: {}", endpoint, e); + } + } + + return success; + } + + + @Override + public boolean stop() { + + /* + * stop regardless if it is locked, in other words, stop operation has precedence over + * locks. + */ + synchronized (this) { + this.alive = false; + } + + final List<Startable> endpoints = this.getEndpoints(); + + boolean success = true; + for (final Startable endpoint : endpoints) { + try { + success = endpoint.stop() && success; + } catch (final Exception e) { + success = false; + logger.error("Problem stopping endpoint: {}", endpoint, e); + } + } + + return success; + } + + /** + * + * @return list of managed endpoints + */ + @JsonIgnore + protected List<Startable> getEndpoints() { + final List<Startable> endpoints = new ArrayList<>(); + + endpoints.addAll(this.getTopicSources()); + endpoints.addAll(this.getTopicSinks()); + + return endpoints; + } + + @Override + public void shutdown() { + IndexedUebTopicSourceFactory.getInstance().destroy(); + IndexedUebTopicSinkFactory.getInstance().destroy(); + IndexedNoopTopicSinkFactory.getInstance().destroy(); + + IndexedDmaapTopicSourceFactory.getInstance().destroy(); + IndexedDmaapTopicSinkFactory.getInstance().destroy(); + } + + @Override + public boolean isAlive() { + return this.alive; + } + + @Override + public boolean lock() { + + synchronized (this) { + if (this.locked) { + return true; + } + + this.locked = true; + } + + for (final TopicSource source : this.getTopicSources()) { + source.lock(); + } + + for (final TopicSink sink : this.getTopicSinks()) { + sink.lock(); + } + + return true; + } + + @Override + public boolean unlock() { + synchronized (this) { + if (!this.locked) { + return true; + } + + this.locked = false; + } + + for (final TopicSource source : this.getTopicSources()) { + source.unlock(); + } + + for (final TopicSink sink : this.getTopicSinks()) { + sink.unlock(); + } + + return true; + } + + @Override + public boolean isLocked() { + return this.locked; + } + + @Override + public List<TopicSource> getTopicSources(List<String> topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List<TopicSource> sources = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSource uebSource = this.getUebTopicSource(topic); + if (uebSource != null) { + sources.add(uebSource); + } + } catch (final Exception e) { + logger.debug("No UEB source for topic: {}", topic, e); + } + + try { + final TopicSource dmaapSource = this.getDmaapTopicSource(topic); + if (dmaapSource != null) { + sources.add(dmaapSource); + } + } catch (final Exception e) { + logger.debug("No DMAAP source for topic: {}", topic, e); + } + } + return sources; + } + + @Override + public List<TopicSink> getTopicSinks(List<String> topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List<TopicSink> sinks = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSink uebSink = this.getUebTopicSink(topic); + if (uebSink != null) { + sinks.add(uebSink); + } + } catch (final Exception e) { + logger.debug("No UEB sink for topic: {}", topic, e); + } + + try { + final TopicSink dmaapSink = this.getDmaapTopicSink(topic); + if (dmaapSink != null) { + sinks.add(dmaapSink); + } + } catch (final Exception e) { + logger.debug("No DMAAP sink for topic: {}", topic, e); + } + + try { + final TopicSink noopSink = this.getNoopTopicSink(topic); + if (noopSink != null) { + sinks.add(noopSink); + } + } catch (final Exception e) { + logger.debug("No NOOP sink for topic: {}", topic, e); + } + } + return sinks; + } + + @Override + public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) { + + if (commType == null) { + throw parmException(topicName); + } + + if (topicName == null) { + throw parmException(topicName); + } + + switch (commType) { + case UEB: + return this.getUebTopicSource(topicName); + case DMAAP: + return this.getDmaapTopicSource(topicName); + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + private IllegalArgumentException parmException(String topicName) { + return new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + @Override + public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) { + if (commType == null) { + throw parmException(topicName); + } + + if (topicName == null) { + throw parmException(topicName); + } + + switch (commType) { + case UEB: + return this.getUebTopicSink(topicName); + case DMAAP: + return this.getDmaapTopicSink(topicName); + case NOOP: + return this.getNoopTopicSink(topicName); + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + @Override + public List<TopicSink> getTopicSinks(String topicName) { + if (topicName == null) { + throw parmException(topicName); + } + + final List<TopicSink> sinks = new ArrayList<>(); + + try { + sinks.add(this.getUebTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + try { + sinks.add(this.getDmaapTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + try { + sinks.add(this.getNoopTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + return sinks; + } + + private void logNoSink(String topicName, Exception ex) { + logger.debug("No sink for topic: {}", topicName, ex); + } + + @Override + public UebTopicSource getUebTopicSource(String topicName) { + return IndexedUebTopicSourceFactory.getInstance().get(topicName); + } + + @Override + public UebTopicSink getUebTopicSink(String topicName) { + return IndexedUebTopicSinkFactory.getInstance().get(topicName); + } + + @Override + public DmaapTopicSource getDmaapTopicSource(String topicName) { + return IndexedDmaapTopicSourceFactory.getInstance().get(topicName); + } + + @Override + public DmaapTopicSink getDmaapTopicSink(String topicName) { + return IndexedDmaapTopicSinkFactory.getInstance().get(topicName); + } + + @Override + public NoopTopicSink getNoopTopicSink(String topicName) { + return IndexedNoopTopicSinkFactory.getInstance().get(topicName); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java new file mode 100644 index 00000000..b4aea22c --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java @@ -0,0 +1,54 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.http.client; + +import javax.ws.rs.core.Response; + +import org.onap.policy.common.capabilities.Startable; + +public interface HttpClient extends Startable { + + public Response get(String path); + + public Response get(); + + public static <T> T getBody(Response response, Class<T> entityType) { + return response.readEntity(entityType); + } + + public String getName(); + + public boolean isHttps(); + + public boolean isSelfSignedCerts(); + + public String getHostname(); + + public int getPort(); + + public String getBasePath(); + + public String getUserName(); + + public String getPassword(); + + public String getBaseUrl(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientFactory.java new file mode 100644 index 00000000..5435ee9c --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientFactory.java @@ -0,0 +1,68 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.http.client; + +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.Properties; + +/** + * Http Client Factory + */ +public interface HttpClientFactory { + + /** + * build and http client with the following parameters + */ + public HttpClient build(String name, boolean https, boolean selfSignedCerts, String hostname, int port, + String baseUrl, String userName, String password, boolean managed) + throws KeyManagementException, NoSuchAlgorithmException; + + /** + * build http client from properties + */ + public List<HttpClient> build(Properties properties) throws KeyManagementException, NoSuchAlgorithmException; + + /** + * get http client + * + * @param name the name + * @return the http client + */ + public HttpClient get(String name); + + /** + * list of http clients + * + * @return http clients + */ + public List<HttpClient> inventory(); + + /** + * destroy by name + * + * @param name name + */ + public void destroy(String name); + + public void destroy(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/impl/IndexedHttpClientFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/impl/IndexedHttpClientFactory.java new file mode 100644 index 00000000..2e911c9a --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/impl/IndexedHttpClientFactory.java @@ -0,0 +1,185 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.http.client.impl; + +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.onap.policy.common.endpoints.http.client.HttpClient; +import org.onap.policy.common.endpoints.http.client.HttpClientFactory; +import org.onap.policy.common.endpoints.http.client.internal.JerseyClient; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * http client factory implementation indexed by name + */ +public class IndexedHttpClientFactory implements HttpClientFactory { + + private static final HttpClientFactory instance = new IndexedHttpClientFactory(); + + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedHttpClientFactory.class); + + protected HashMap<String, HttpClient> clients = new HashMap<>(); + + /** + * Get the singleton instance. + * + * @return the instance + */ + public static HttpClientFactory getInstance() { + return instance; + } + + private IndexedHttpClientFactory() {} + + @Override + public synchronized HttpClient build(String name, boolean https, boolean selfSignedCerts, String hostname, int port, + String baseUrl, String userName, String password, boolean managed) + throws KeyManagementException, NoSuchAlgorithmException { + if (clients.containsKey(name)) { + return clients.get(name); + } + + JerseyClient client = + new JerseyClient(name, https, selfSignedCerts, hostname, port, baseUrl, userName, password); + + if (managed) { + clients.put(name, client); + } + + return client; + } + + @Override + public synchronized List<HttpClient> build(Properties properties) + throws KeyManagementException, NoSuchAlgorithmException { + ArrayList<HttpClient> clientList = new ArrayList<>(); + + String clientNames = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES); + if (clientNames == null || clientNames.isEmpty()) { + return clientList; + } + + List<String> clientNameList = new ArrayList<>(Arrays.asList(clientNames.split("\\s*,\\s*"))); + + for (String clientName : clientNameList) { + String httpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + boolean https = false; + if (httpsString != null && !httpsString.isEmpty()) { + https = Boolean.parseBoolean(httpsString); + } + + String hostName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX); + + String servicePortString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + + "." + clientName + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX); + int port; + try { + if (servicePortString == null || servicePortString.isEmpty()) { + continue; + } + port = Integer.parseInt(servicePortString); + } catch (NumberFormatException nfe) { + logger.error("http-client-factory: cannot parse port {}", servicePortString, nfe); + continue; + } + + String baseUrl = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + PolicyEndPointProperties.PROPERTY_HTTP_URL_SUFFIX); + + String userName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX); + + String password = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX); + + String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + try { + HttpClient client = + this.build(clientName, https, https, hostName, port, baseUrl, userName, password, managed); + clientList.add(client); + } catch (Exception e) { + logger.error("http-client-factory: cannot build client {}", clientName, e); + } + } + + return clientList; + } + + @Override + public synchronized HttpClient get(String name) { + if (clients.containsKey(name)) { + return clients.get(name); + } + + throw new IllegalArgumentException("Http Client " + name + " not found"); + } + + @Override + public synchronized List<HttpClient> inventory() { + return new ArrayList<>(this.clients.values()); + } + + @Override + public synchronized void destroy(String name) { + if (!clients.containsKey(name)) { + return; + } + + HttpClient client = clients.remove(name); + try { + client.shutdown(); + } catch (IllegalStateException e) { + logger.error("http-client-factory: cannot shutdown client {}", client, e); + } + } + + @Override + public void destroy() { + List<HttpClient> clientsInventory = this.inventory(); + for (HttpClient client : clientsInventory) { + client.shutdown(); + } + + synchronized (this) { + this.clients.clear(); + } + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java new file mode 100644 index 00000000..d5e16117 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java @@ -0,0 +1,255 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.http.client.internal; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.Response; + +import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; +import org.onap.policy.common.endpoints.http.client.HttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JerseyClient implements HttpClient { + + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(JerseyClient.class); + + protected final String name; + protected final boolean https; + protected final boolean selfSignedCerts; + protected final String hostname; + protected final int port; + protected final String basePath; + protected final String userName; + protected final String password; + + protected final Client client; + protected final String baseUrl; + + protected boolean alive = true; + + + public JerseyClient(String name, boolean https, boolean selfSignedCerts, String hostname, int port, String basePath, + String userName, String password) throws KeyManagementException, NoSuchAlgorithmException { + + super(); + + if (name == null || name.isEmpty()) { + throw new IllegalArgumentException("Name must be provided"); + } + + if (hostname == null || hostname.isEmpty()) { + throw new IllegalArgumentException("Hostname must be provided"); + } + + if (port <= 0 && port >= 65535) { + throw new IllegalArgumentException("Invalid Port provided: " + port); + } + + this.name = name; + this.https = https; + this.hostname = hostname; + this.port = port; + this.basePath = basePath; + this.userName = userName; + this.password = password; + this.selfSignedCerts = selfSignedCerts; + + StringBuilder tmpBaseUrl = new StringBuilder(); + if (this.https) { + tmpBaseUrl.append("https://"); + ClientBuilder clientBuilder; + SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); + if (this.selfSignedCerts) { + sslContext.init(null, new TrustManager[] {new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType) + throws CertificateException { + // always trusted + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType) + throws CertificateException { + // always trusted + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + + }}, new SecureRandom()); + clientBuilder = + ClientBuilder.newBuilder().sslContext(sslContext).hostnameVerifier((host, session) -> true); + } else { + sslContext.init(null, null, null); + clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext); + } + this.client = clientBuilder.build(); + } else { + tmpBaseUrl.append("http://"); + this.client = ClientBuilder.newClient(); + } + + if (this.userName != null && !this.userName.isEmpty() && this.password != null && !this.password.isEmpty()) { + HttpAuthenticationFeature authFeature = HttpAuthenticationFeature.basic(userName, password); + this.client.register(authFeature); + } + + this.baseUrl = tmpBaseUrl.append(this.hostname).append(":").append(this.port).append("/") + .append((this.basePath == null) ? "" : this.basePath).toString(); + } + + @Override + public Response get(String path) { + if (path != null && !path.isEmpty()) { + return this.client.target(this.baseUrl).path(path).request().get(); + } else { + return this.client.target(this.baseUrl).request().get(); + } + } + + @Override + public Response get() { + return this.client.target(this.baseUrl).request().get(); + } + + + @Override + public boolean start() { + return alive; + } + + @Override + public boolean stop() { + return !alive; + } + + @Override + public void shutdown() { + synchronized (this) { + alive = false; + } + + try { + this.client.close(); + } catch (Exception e) { + logger.warn("{}: cannot close because of {}", this, e.getMessage(), e); + } + } + + @Override + public synchronized boolean isAlive() { + return this.alive; + } + + @Override + public String getName() { + return name; + } + + @Override + public boolean isHttps() { + return https; + } + + @Override + public boolean isSelfSignedCerts() { + return selfSignedCerts; + } + + @Override + public String getHostname() { + return hostname; + } + + @Override + public int getPort() { + return port; + } + + @Override + public String getBasePath() { + return basePath; + } + + @Override + public String getUserName() { + return userName; + } + + @JsonIgnore + @Override + public String getPassword() { + return password; + } + + @Override + public String getBaseUrl() { + return baseUrl; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("JerseyClient [name="); + builder.append(name); + builder.append(", https="); + builder.append(https); + builder.append(", selfSignedCerts="); + builder.append(selfSignedCerts); + builder.append(", hostname="); + builder.append(hostname); + builder.append(", port="); + builder.append(port); + builder.append(", basePath="); + builder.append(basePath); + builder.append(", userName="); + builder.append(userName); + builder.append(", password="); + builder.append(password); + builder.append(", client="); + builder.append(client); + builder.append(", baseUrl="); + builder.append(baseUrl); + builder.append(", alive="); + builder.append(alive); + builder.append("]"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServer.java new file mode 100644 index 00000000..d62eb028 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServer.java @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.http.server; + +import org.onap.policy.common.capabilities.Startable; + +/** + * A Jetty Server to server REST Requests + */ +public interface HttpServletServer extends Startable { + + /** + * + * @return port + */ + public int getPort(); + + /** + * enables basic authentication with user and password on the the relative path relativeUriPath + * + * @param user + * @param password + * @param relativeUriPath + */ + public void setBasicAuthentication(String user, String password, String relativeUriPath); + + /** + * adds a JAX-RS servlet class to serve REST requests + * + * @param servletPath servlet path + * @param restClass JAX-RS API Class + * + * @throws IllegalArgumentException unable to process because of invalid input + * @throws IllegalStateException unable to process because of invalid state + */ + public void addServletClass(String servletPath, String restClass); + + /** + * adds a package containing JAX-RS classes to serve REST requests + * + * @param servletPath servlet path + * @param restPackage JAX-RS package to scan + * + * @throws IllegalArgumentException unable to process because of invalid input + * @throws IllegalStateException unable to process because of invalid state + */ + public void addServletPackage(String servletPath, String restPackage); + + /** + * blocking start of the http server + * + * @param maxWaitTime max time to wait for the start to take place + * @return true if start was successful + * + * @throws IllegalArgumentException if arguments are invalid + * @throws InterruptedException if the blocking operation is interrupted + */ + public boolean waitedStart(long maxWaitTime) throws InterruptedException; +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java new file mode 100644 index 00000000..d0909b58 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.http.server; + +import java.util.List; +import java.util.Properties; + +/** + * Factory of HTTP Servlet-Enabled Servlets + */ +public interface HttpServletServerFactory { + + /** + * builds an http server with support for servlets + * + * @param name name + * @param host binding host + * @param port port + * @param contextPath server base path + * @param swagger enable swagger documentation + * @param managed is it managed by infrastructure + * @return http server + * @throws IllegalArgumentException when invalid parameters are provided + */ + public HttpServletServer build(String name, String host, int port, String contextPath, boolean swagger, + boolean managed); + + /** + * list of http servers per properties + * + * @param properties properties based configuration + * @return list of http servers + * @throws IllegalArgumentException when invalid parameters are provided + */ + public List<HttpServletServer> build(Properties properties); + + /** + * gets a server based on the port + * + * @param port port + * @return http server + */ + public HttpServletServer get(int port); + + /** + * provides an inventory of servers + * + * @return inventory of servers + */ + public List<HttpServletServer> inventory(); + + /** + * destroys server bound to a port + * + * @param port + */ + public void destroy(int port); + + /** + * destroys the factory and therefore all servers + */ + public void destroy(); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/impl/IndexedHttpServletServerFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/impl/IndexedHttpServletServerFactory.java new file mode 100644 index 00000000..9723d808 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/impl/IndexedHttpServletServerFactory.java @@ -0,0 +1,215 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.http.server.impl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.onap.policy.common.endpoints.http.server.HttpServletServer; +import org.onap.policy.common.endpoints.http.server.HttpServletServerFactory; +import org.onap.policy.common.endpoints.http.server.internal.JettyJerseyServer; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Indexed factory implementation + */ +public class IndexedHttpServletServerFactory implements HttpServletServerFactory { + + private static final HttpServletServerFactory instance = new IndexedHttpServletServerFactory(); + + /** + * Get the singleton instance. + * + * @return the instance + */ + public static HttpServletServerFactory getInstance() { + return instance; + } + + private IndexedHttpServletServerFactory() {} + + private static final String SPACES_COMMA_SPACES = "\\s*,\\s*"; + + /** + * logger + */ + protected static Logger logger = LoggerFactory.getLogger(IndexedHttpServletServerFactory.class); + + /** + * servers index + */ + protected HashMap<Integer, HttpServletServer> servers = new HashMap<>(); + + @Override + public synchronized HttpServletServer build(String name, String host, int port, String contextPath, boolean swagger, + boolean managed) { + + if (servers.containsKey(port)) { + return servers.get(port); + } + + JettyJerseyServer server = new JettyJerseyServer(name, host, port, contextPath, swagger); + if (managed) { + servers.put(port, server); + } + + return server; + } + + @Override + public synchronized List<HttpServletServer> build(Properties properties) { + + ArrayList<HttpServletServer> serviceList = new ArrayList<>(); + + String serviceNames = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES); + if (serviceNames == null || serviceNames.isEmpty()) { + logger.warn("No topic for HTTP Service: {}", properties); + return serviceList; + } + + List<String> serviceNameList = Arrays.asList(serviceNames.split(SPACES_COMMA_SPACES)); + + for (String serviceName : serviceNameList) { + String servicePortString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX); + + int servicePort; + try { + if (servicePortString == null || servicePortString.isEmpty()) { + if (logger.isWarnEnabled()) { + logger.warn("No HTTP port for service in {}", serviceName); + } + continue; + } + servicePort = Integer.parseInt(servicePortString); + } catch (NumberFormatException nfe) { + if (logger.isWarnEnabled()) { + logger.warn("No HTTP port for service in {}", serviceName); + } + continue; + } + + String hostName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + serviceName + + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX); + + String contextUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX); + + String userName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + serviceName + + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX); + + String password = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + serviceName + + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX); + + String authUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX); + + String restClasses = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX); + + String restPackages = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX); + String restUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX); + + String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + String swaggerString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX); + boolean swagger = false; + if (swaggerString != null && !swaggerString.isEmpty()) { + swagger = Boolean.parseBoolean(swaggerString); + } + + HttpServletServer service = build(serviceName, hostName, servicePort, contextUriPath, swagger, managed); + if (userName != null && !userName.isEmpty() && password != null && !password.isEmpty()) { + service.setBasicAuthentication(userName, password, authUriPath); + } + + if (restClasses != null && !restClasses.isEmpty()) { + List<String> restClassesList = Arrays.asList(restClasses.split(SPACES_COMMA_SPACES)); + for (String restClass : restClassesList) { + service.addServletClass(restUriPath, restClass); + } + } + + if (restPackages != null && !restPackages.isEmpty()) { + List<String> restPackageList = Arrays.asList(restPackages.split(SPACES_COMMA_SPACES)); + for (String restPackage : restPackageList) { + service.addServletPackage(restUriPath, restPackage); + } + } + + serviceList.add(service); + } + + return serviceList; + } + + @Override + public synchronized HttpServletServer get(int port) { + + if (servers.containsKey(port)) { + return servers.get(port); + } + + throw new IllegalArgumentException("Http Server for " + port + " not found"); + } + + @Override + public synchronized List<HttpServletServer> inventory() { + return new ArrayList<>(this.servers.values()); + } + + @Override + public synchronized void destroy(int port) { + + if (!servers.containsKey(port)) { + return; + } + + HttpServletServer server = servers.remove(port); + server.shutdown(); + } + + @Override + public synchronized void destroy() { + List<HttpServletServer> httpServletServers = this.inventory(); + for (HttpServletServer server : httpServletServers) { + server.shutdown(); + } + + synchronized (this) { + this.servers.clear(); + } + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyJerseyServer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyJerseyServer.java new file mode 100644 index 00000000..cd286927 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyJerseyServer.java @@ -0,0 +1,254 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.http.server.internal; + +import java.util.HashMap; + +import org.eclipse.jetty.servlet.ServletHolder; +import org.onap.policy.common.utils.network.NetworkUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.swagger.jersey.config.JerseyJaxrsConfig; + +/** + * REST Jetty Server that uses Jersey Servlets to support JAX-RS Web Services + */ +public class JettyJerseyServer extends JettyServletServer { + + /** + * Swagger API Base Path + */ + protected static final String SWAGGER_API_BASEPATH = "swagger.api.basepath"; + + /** + * Swagger Context ID + */ + protected static final String SWAGGER_CONTEXT_ID = "swagger.context.id"; + + /** + * Swagger Scanner ID + */ + protected static final String SWAGGER_SCANNER_ID = "swagger.scanner.id"; + + /** + * Swagger Pretty Print + */ + protected static final String SWAGGER_PRETTY_PRINT = "swagger.pretty.print"; + + /** + * Swagger Packages + */ + protected static final String SWAGGER_INIT_PACKAGES_PARAM_VALUE = "io.swagger.jaxrs.listing"; + + /** + * Jersey Packages Init Param Name + */ + protected static final String JERSEY_INIT_PACKAGES_PARAM_NAME = "jersey.config.server.provider.packages"; + + /** + * Jersey Packages Init Param Value + */ + protected static final String JERSEY_INIT_PACKAGES_PARAM_VALUE = "com.fasterxml.jackson.jaxrs.json"; + + /** + * Jersey Classes Init Param Name + */ + protected static final String JERSEY_INIT_CLASSNAMES_PARAM_NAME = "jersey.config.server.provider.classnames"; + + /** + * Jersey Jackson Classes Init Param Value + */ + protected static final String JERSEY_JACKSON_INIT_CLASSNAMES_PARAM_VALUE = + "com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider"; + + /** + * Jersey Swagger Classes Init Param Value + */ + protected static final String SWAGGER_INIT_CLASSNAMES_PARAM_VALUE = + "io.swagger.jaxrs.listing.ApiListingResource," + "io.swagger.jaxrs.listing.SwaggerSerializers"; + /** + * Logger + */ + protected static Logger logger = LoggerFactory.getLogger(JettyJerseyServer.class); + + /** + * Container for servlets + */ + protected HashMap<String, ServletHolder> servlets = new HashMap<>(); + + /** + * Swagger ID + */ + protected String swaggerId = null; + + /** + * Constructor + * + * @param name name + * @param host host server host + * @param port port server port + * @param swagger support swagger? + * @param contextPath context path + * + * @throws IllegalArgumentException in invalid arguments are provided + */ + public JettyJerseyServer(String name, String host, int port, String contextPath, boolean swagger) { + + super(name, host, port, contextPath); + if (swagger) { + this.swaggerId = "swagger-" + this.port; + attachSwaggerServlet(); + } + } + + /** + * attaches a swagger initialization servlet + */ + protected void attachSwaggerServlet() { + + ServletHolder swaggerServlet = context.addServlet(JerseyJaxrsConfig.class, "/"); + + String hostname = this.connector.getHost(); + if (hostname == null || hostname.isEmpty() || hostname.equals(NetworkUtil.IPv4_WILDCARD_ADDRESS)) { + hostname = NetworkUtil.getHostname(); + } + + swaggerServlet.setInitParameter(SWAGGER_API_BASEPATH, + "http://" + hostname + ":" + this.connector.getPort() + "/"); + swaggerServlet.setInitParameter(SWAGGER_CONTEXT_ID, swaggerId); + swaggerServlet.setInitParameter(SWAGGER_SCANNER_ID, swaggerId); + swaggerServlet.setInitParameter(SWAGGER_PRETTY_PRINT, "true"); + swaggerServlet.setInitOrder(2); + + if (logger.isDebugEnabled()) { + logger.debug("{}: Swagger Servlet has been attached: {}", this, swaggerServlet.dump()); + } + } + + /** + * retrieves cached server based on servlet path + * + * @param servletPath servlet path + * @return the jetty servlet holder + * + * @throws IllegalArgumentException if invalid arguments are provided + */ + protected synchronized ServletHolder getServlet(String servletPath) { + + ServletHolder jerseyServlet = servlets.get(servletPath); + if (jerseyServlet == null) { + jerseyServlet = context.addServlet(org.glassfish.jersey.servlet.ServletContainer.class, servletPath); + jerseyServlet.setInitOrder(0); + servlets.put(servletPath, jerseyServlet); + } + + return jerseyServlet; + } + + @Override + public synchronized void addServletPackage(String servletPath, String restPackage) { + String servPath = servletPath; + if (restPackage == null || restPackage.isEmpty()) { + throw new IllegalArgumentException("No discoverable REST package provided"); + } + + if (servPath == null || servPath.isEmpty()) { + servPath = "/*"; + } + + ServletHolder jerseyServlet = this.getServlet(servPath); + + String initClasses = jerseyServlet.getInitParameter(JERSEY_INIT_CLASSNAMES_PARAM_NAME); + if (initClasses != null && !initClasses.isEmpty()) { + logger.warn("Both packages and classes are used in Jetty+Jersey Configuration: {}", restPackage); + } + + String initPackages = jerseyServlet.getInitParameter(JERSEY_INIT_PACKAGES_PARAM_NAME); + if (initPackages == null) { + if (this.swaggerId != null) { + initPackages = + JERSEY_INIT_PACKAGES_PARAM_VALUE + "," + SWAGGER_INIT_PACKAGES_PARAM_VALUE + "," + restPackage; + + jerseyServlet.setInitParameter(SWAGGER_CONTEXT_ID, swaggerId); + jerseyServlet.setInitParameter(SWAGGER_SCANNER_ID, swaggerId); + } else { + initPackages = JERSEY_INIT_PACKAGES_PARAM_VALUE + "," + restPackage; + } + } else { + initPackages = initPackages + "," + restPackage; + } + + jerseyServlet.setInitParameter(JERSEY_INIT_PACKAGES_PARAM_NAME, initPackages); + + if (logger.isDebugEnabled()) { + logger.debug("{}: added REST package: {}", this, jerseyServlet.dump()); + } + } + + @Override + public synchronized void addServletClass(String servletPath, String restClass) { + + if (restClass == null || restClass.isEmpty()) { + throw new IllegalArgumentException("No discoverable REST class provided"); + } + + if (servletPath == null || servletPath.isEmpty()) { + servletPath = "/*"; + } + + ServletHolder jerseyServlet = this.getServlet(servletPath); + + String initPackages = jerseyServlet.getInitParameter(JERSEY_INIT_PACKAGES_PARAM_NAME); + if (initPackages != null && !initPackages.isEmpty()) { + logger.warn("Both classes and packages are used in Jetty+Jersey Configuration: {}", restClass); + } + + String initClasses = jerseyServlet.getInitParameter(JERSEY_INIT_CLASSNAMES_PARAM_NAME); + if (initClasses == null) { + if (this.swaggerId != null) { + initClasses = JERSEY_JACKSON_INIT_CLASSNAMES_PARAM_VALUE + "," + SWAGGER_INIT_CLASSNAMES_PARAM_VALUE + + "," + restClass; + + jerseyServlet.setInitParameter(SWAGGER_CONTEXT_ID, swaggerId); + jerseyServlet.setInitParameter(SWAGGER_SCANNER_ID, swaggerId); + } else { + initClasses = JERSEY_JACKSON_INIT_CLASSNAMES_PARAM_VALUE + "," + restClass; + } + } else { + initClasses = initClasses + "," + restClass; + } + + jerseyServlet.setInitParameter(JERSEY_INIT_CLASSNAMES_PARAM_NAME, initClasses); + + if (logger.isDebugEnabled()) { + logger.debug("{}: added REST class: {}", this, jerseyServlet.dump()); + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("JettyJerseyServer [servlets=").append(servlets).append(", swaggerId=").append(swaggerId) + .append(", toString()=").append(super.toString()).append("]"); + return builder.toString(); + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyServletServer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyServletServer.java new file mode 100644 index 00000000..97166ec7 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyServletServer.java @@ -0,0 +1,399 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.http.server.internal; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import org.eclipse.jetty.security.ConstraintMapping; +import org.eclipse.jetty.security.ConstraintSecurityHandler; +import org.eclipse.jetty.security.HashLoginService; +import org.eclipse.jetty.security.authentication.BasicAuthenticator; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.Slf4jRequestLog; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.util.security.Constraint; +import org.eclipse.jetty.util.security.Credential; +import org.onap.policy.common.endpoints.http.server.HttpServletServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Http Server implementation using Embedded Jetty + */ +public abstract class JettyServletServer implements HttpServletServer, Runnable { + + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(JettyServletServer.class); + + /** + * server name + */ + protected final String name; + + /** + * server host address + */ + protected final String host; + + /** + * server port to bind + */ + protected final int port; + + /** + * server auth user name + */ + protected String user; + + /** + * server auth password name + */ + protected String password; + + /** + * server base context path + */ + protected final String contextPath; + + /** + * embedded jetty server + */ + protected final Server jettyServer; + + /** + * servlet context + */ + protected final ServletContextHandler context; + + /** + * jetty connector + */ + protected final ServerConnector connector; + + /** + * jetty thread + */ + protected volatile Thread jettyThread; + + /** + * start condition + */ + protected Object startCondition = new Object(); + + /** + * constructor + * + * @param name server name + * @param host server host + * @param port server port + * @param contextPath context path + * + * @throws IllegalArgumentException if invalid parameters are passed in + */ + public JettyServletServer(String name, String host, int port, String contextPath) { + String srvName = name; + String srvHost = host; + String ctxtPath = contextPath; + + if (srvName == null || srvName.isEmpty()) { + srvName = "http-" + port; + } + + if (port <= 0 && port >= 65535) { + throw new IllegalArgumentException("Invalid Port provided: " + port); + } + + if (srvHost == null || srvHost.isEmpty()) { + srvHost = "localhost"; + } + + if (ctxtPath == null || ctxtPath.isEmpty()) { + ctxtPath = "/"; + } + + this.name = srvName; + + this.host = srvHost; + this.port = port; + + this.contextPath = ctxtPath; + + this.context = new ServletContextHandler(ServletContextHandler.SESSIONS); + this.context.setContextPath(ctxtPath); + + this.jettyServer = new Server(); + this.jettyServer.setRequestLog(new Slf4jRequestLog()); + + this.connector = new ServerConnector(this.jettyServer); + this.connector.setName(srvName); + this.connector.setReuseAddress(true); + this.connector.setPort(port); + this.connector.setHost(srvHost); + + this.jettyServer.addConnector(this.connector); + this.jettyServer.setHandler(context); + } + + @Override + public void setBasicAuthentication(String user, String password, String servletPath) { + String srvltPath = servletPath; + + if (user == null || user.isEmpty() || password == null || password.isEmpty()) { + throw new IllegalArgumentException("Missing user and/or password"); + } + + if (srvltPath == null || srvltPath.isEmpty()) { + srvltPath = "/*"; + } + + HashLoginService hashLoginService = new HashLoginService(); + hashLoginService.putUser(user, Credential.getCredential(password), new String[] {"user"}); + hashLoginService.setName(this.connector.getName() + "-login-service"); + + Constraint constraint = new Constraint(); + constraint.setName(Constraint.__BASIC_AUTH); + constraint.setRoles(new String[] {"user"}); + constraint.setAuthenticate(true); + + ConstraintMapping constraintMapping = new ConstraintMapping(); + constraintMapping.setConstraint(constraint); + constraintMapping.setPathSpec(srvltPath); + + ConstraintSecurityHandler securityHandler = new ConstraintSecurityHandler(); + securityHandler.setAuthenticator(new BasicAuthenticator()); + securityHandler.setRealmName(this.connector.getName() + "-realm"); + securityHandler.addConstraintMapping(constraintMapping); + securityHandler.setLoginService(hashLoginService); + + this.context.setSecurityHandler(securityHandler); + + this.user = user; + this.password = password; + } + + /** + * jetty server execution + */ + @Override + public void run() { + try { + logger.info("{}: STARTING", this); + + this.jettyServer.start(); + + if (logger.isInfoEnabled()) { + logger.info("{}: STARTED: {}", this, this.jettyServer.dump()); + } + + synchronized (this.startCondition) { + this.startCondition.notifyAll(); + } + + this.jettyServer.join(); + } catch (Exception e) { + logger.error("{}: error found while bringing up server", this, e); + } + } + + @Override + public boolean waitedStart(long maxWaitTime) throws InterruptedException { + logger.info("{}: WAITED-START", this); + + if (maxWaitTime < 0) { + throw new IllegalArgumentException("max-wait-time cannot be negative"); + } + + long pendingWaitTime = maxWaitTime; + + if (!this.start()) { + return false; + } + + synchronized (this.startCondition) { + + while (!this.jettyServer.isRunning()) { + try { + long startTs = System.currentTimeMillis(); + + this.startCondition.wait(pendingWaitTime); + + if (maxWaitTime == 0) { + /* spurious notification */ + continue; + } + + long endTs = System.currentTimeMillis(); + pendingWaitTime = pendingWaitTime - (endTs - startTs); + + logger.info("{}: pending time is {} ms.", this, pendingWaitTime); + + if (pendingWaitTime <= 0) { + return false; + } + + } catch (InterruptedException e) { + logger.warn("{}: waited-start has been interrupted", this); + throw e; + } + } + + return this.jettyServer.isRunning(); + } + } + + @Override + public boolean start() { + logger.info("{}: STARTING", this); + + synchronized (this) { + if (jettyThread == null || !this.jettyThread.isAlive()) { + + this.jettyThread = new Thread(this); + this.jettyThread.setName(this.name + "-" + this.port); + this.jettyThread.start(); + } + } + + return true; + } + + @Override + public boolean stop() { + logger.info("{}: STOPPING", this); + + synchronized (this) { + if (jettyThread == null) { + return true; + } + + if (!jettyThread.isAlive()) { + this.jettyThread = null; + } + + try { + this.connector.stop(); + } catch (Exception e) { + logger.error("{}: error while stopping management server", this, e); + } + + try { + this.jettyServer.stop(); + } catch (Exception e) { + logger.error("{}: error while stopping management server", this, e); + return false; + } + + Thread.yield(); + } + + return true; + } + + @Override + public void shutdown() { + logger.info("{}: SHUTTING DOWN", this); + + this.stop(); + + if (this.jettyThread == null) { + return; + } + + Thread jettyThreadCopy = this.jettyThread; + + if (jettyThreadCopy.isAlive()) { + try { + jettyThreadCopy.join(2000L); + } catch (InterruptedException e) { + logger.warn("{}: error while shutting down management server", this); + Thread.currentThread().interrupt(); + } + if (!jettyThreadCopy.isInterrupted()) { + try { + jettyThreadCopy.interrupt(); + } catch (Exception e) { + // do nothing + logger.warn("{}: exception while shutting down (OK)", this, e); + } + } + } + + this.jettyServer.destroy(); + } + + @Override + public boolean isAlive() { + if (this.jettyThread != null) { + return this.jettyThread.isAlive(); + } + + return false; + } + + @Override + public int getPort() { + return this.port; + } + + /** + * @return the name + */ + public String getName() { + return name; + } + + /** + * @return the host + */ + public String getHost() { + return host; + } + + /** + * @return the user + */ + public String getUser() { + return user; + } + + /** + * @return the password + */ + @JsonIgnore + public String getPassword() { + return password; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("JettyServer [name=").append(name).append(", host=").append(host).append(", port=").append(port) + .append(", user=").append(user).append(", password=").append(password != null).append(", contextPath=") + .append(contextPath).append(", jettyServer=").append(jettyServer).append(", context=") + .append(this.context).append(", connector=").append(connector).append(", jettyThread=") + .append(jettyThread).append("]"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java new file mode 100644 index 00000000..38f9b94d --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java @@ -0,0 +1,100 @@ +/*- + * ============LICENSE_START======================================================= + * policy-core + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.properties; + +public interface PolicyEndPointProperties { + + /* Generic property suffixes */ + + public static final String PROPERTY_TOPIC_SERVERS_SUFFIX = ".servers"; + public static final String PROPERTY_TOPIC_API_KEY_SUFFIX = ".apiKey"; + public static final String PROPERTY_TOPIC_API_SECRET_SUFFIX = ".apiSecret"; + public static final String PROPERTY_TOPIC_AAF_MECHID_SUFFIX = ".aafMechId"; + public static final String PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX = ".aafPassword"; + public static final String PROPERTY_TOPIC_EVENTS_SUFFIX = ".events"; + public static final String PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX = ".filter"; + public static final String PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX = ".events.custom.gson"; + public static final String PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_JACKSON_SUFFIX = ".events.custom.jackson"; + + public static final String PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX = ".consumerGroup"; + public static final String PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX = ".consumerInstance"; + public static final String PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX = ".fetchTimeout"; + public static final String PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX = ".fetchLimit"; + public static final String PROPERTY_MANAGED_SUFFIX = ".managed"; + + public static final String PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX = ".partitionKey"; + + public static final String PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX = ".selfSignedCertificates"; + + /* UEB Properties */ + + public static final String PROPERTY_UEB_SOURCE_TOPICS = "ueb.source.topics"; + public static final String PROPERTY_UEB_SINK_TOPICS = "ueb.sink.topics"; + + /* DMAAP Properties */ + + public static final String PROPERTY_DMAAP_SOURCE_TOPICS = "dmaap.source.topics"; + public static final String PROPERTY_DMAAP_SINK_TOPICS = "dmaap.sink.topics"; + + public static final String PROPERTY_DMAAP_DME2_PARTNER_SUFFIX = ".dme2.partner"; + public static final String PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX = ".dme2.routeOffer"; + public static final String PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX = ".dme2.environment"; + public static final String PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX = ".dme2.aft.environment"; + public static final String PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX = ".dme2.latitude"; + public static final String PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX = ".dme2.longitude"; + + public static final String PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX = ".dme2.epReadTimeoutMs"; + public static final String PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX = ".dme2.epConnTimeout"; + public static final String PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX = ".dme2.roundtripTimeoutMs"; + public static final String PROPERTY_DMAAP_DME2_VERSION_SUFFIX = ".dme2.version"; + public static final String PROPERTY_DMAAP_DME2_SERVICE_NAME_SUFFIX = ".dme2.serviceName"; + public static final String PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX = ".dme2.subContextPath"; + public static final String PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX = + ".dme2.sessionStickinessRequired"; + + public static final String PROPERTY_NOOP_SINK_TOPICS = "noop.sink.topics"; + + /* HTTP Server Properties */ + + public static final String PROPERTY_HTTP_SERVER_SERVICES = "http.server.services"; + + public static final String PROPERTY_HTTP_HOST_SUFFIX = ".host"; + public static final String PROPERTY_HTTP_PORT_SUFFIX = ".port"; + public static final String PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX = ".contextUriPath"; + + public static final String PROPERTY_HTTP_AUTH_USERNAME_SUFFIX = ".userName"; + public static final String PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX = ".password"; + public static final String PROPERTY_HTTP_AUTH_URIPATH_SUFFIX = ".authUriPath"; + + public static final String PROPERTY_HTTP_REST_CLASSES_SUFFIX = ".restClasses"; + public static final String PROPERTY_HTTP_REST_PACKAGES_SUFFIX = ".restPackages"; + public static final String PROPERTY_HTTP_REST_URIPATH_SUFFIX = ".restUriPath"; + + public static final String PROPERTY_HTTP_HTTPS_SUFFIX = ".https"; + public static final String PROPERTY_HTTP_SWAGGER_SUFFIX = ".swagger"; + + /* HTTP Client Properties */ + + public static final String PROPERTY_HTTP_CLIENT_SERVICES = "http.client.services"; + + public static final String PROPERTY_HTTP_URL_SUFFIX = PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX; + +} |