diff options
Diffstat (limited to 'policy-endpoints/src/main')
77 files changed, 1516 insertions, 3518 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/FilterableTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/FilterableTopicSource.java deleted file mode 100644 index 27f4ceb7..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/FilterableTopicSource.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm; - -/** - * TopicSource that supports server-side filtering. - */ -public interface FilterableTopicSource extends TopicSource { - - /** - * Sets the server-side filter. - * - * @param filter new filter value, or {@code null} - * @throws UnsupportedOperationException if the consumer does not support - * server-side filtering - * @throws IllegalArgumentException if the consumer cannot be built with the - * new filter - */ - public void setFilter(String filter); - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java index 6532a198..ce8e2387 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java @@ -4,6 +4,7 @@ * ================================================================================ * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2019 Samsung Electronics Co., Ltd. + * Copyright (C) 2022,2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,13 +37,9 @@ public interface Topic extends TopicRegisterable, Startable, Lockable { */ enum CommInfrastructure { /** - * UEB Communication Infrastructure. + * KAFKA Communication Infrastructure. */ - UEB, - /** - * DMAAP Communication Infrastructure. - */ - DMAAP, + KAFKA, /** * NOOP for internal use only. */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java index bb707523..bf261def 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022,2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,17 +25,15 @@ import java.util.List; import java.util.Properties; import org.onap.policy.common.capabilities.Lockable; import org.onap.policy.common.capabilities.Startable; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; import org.onap.policy.common.endpoints.parameters.TopicParameters; /** - * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into + * Abstraction to manage the system's Networked Topic Endpoints, sources of all events input into * the System. */ public interface TopicEndpoint extends Startable, Lockable { @@ -128,36 +127,20 @@ public interface TopicEndpoint extends Startable, Lockable { TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName); /** - * Get the UEB Topic Source for the given topic name. - * - * @param topicName the topic name - * - * @return the UEB Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - UebTopicSource getUebTopicSource(String topicName); - - /** - * Get the DMAAP Topic Source for the given topic name. - * - * @param topicName the topic name + * Get the Noop Source for the given topic name. * - * @return the DMAAP Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present + * @param topicName the topic name. + * @return the Noop Source. */ - DmaapTopicSource getDmaapTopicSource(String topicName); + NoopTopicSource getNoopTopicSource(String topicName); /** - * Get the Noop Source for the given topic name. + * Get the Kafka Source for the given topic name. * * @param topicName the topic name. - * @return the Noop Source. + * @return the Kafka Source. */ - NoopTopicSource getNoopTopicSource(String topicName); + KafkaTopicSource getKafkaTopicSource(String topicName); /** * Get the Topic Sinks for the given topic name. @@ -201,18 +184,6 @@ public interface TopicEndpoint extends Startable, Lockable { TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName); /** - * Get the UEB Topic Source for the given topic name. - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for example multiple - * TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - UebTopicSink getUebTopicSink(String topicName); - - /** * Get the no-op Topic Sink for the given topic name. * * @param topicName the topic name @@ -225,7 +196,7 @@ public interface TopicEndpoint extends Startable, Lockable { NoopTopicSink getNoopTopicSink(String topicName); /** - * Get the DMAAP Topic Source for the given topic name. + * Get the KAFKA Topic Source for the given topic name. * * @param topicName the topic name * @@ -234,21 +205,14 @@ public interface TopicEndpoint extends Startable, Lockable { * TopicReaders for a topic name and communication infrastructure * @throws IllegalArgumentException if invalid parameters are present */ - DmaapTopicSink getDmaapTopicSink(String topicName); - - /** - * Gets only the UEB Topic Sources. - * - * @return the UEB Topic Source List - */ - List<UebTopicSource> getUebTopicSources(); + KafkaTopicSink getKafkaTopicSink(String topicName); /** - * Gets only the DMAAP Topic Sources. + * Gets only the KAFKA Topic Sources. * - * @return the DMAAP Topic Source List + * @return the KAFKA Topic Source List */ - List<DmaapTopicSource> getDmaapTopicSources(); + List<KafkaTopicSource> getKafkaTopicSources(); /** * Gets only the NOOP Topic Sources. @@ -258,18 +222,11 @@ public interface TopicEndpoint extends Startable, Lockable { List<NoopTopicSource> getNoopTopicSources(); /** - * Gets only the UEB Topic Sinks. + * Gets only the KAFKA Topic Sinks. * - * @return the UEB Topic Sink List + * @return the KAFKA Topic Sinks List */ - List<UebTopicSink> getUebTopicSinks(); - - /** - * Gets only the DMAAP Topic Sinks. - * - * @return the DMAAP Topic Sink List - */ - List<DmaapTopicSink> getDmaapTopicSinks(); + List<KafkaTopicSink> getKafkaTopicSinks(); /** * Gets only the NOOP Topic Sinks. @@ -277,4 +234,5 @@ public interface TopicEndpoint extends Startable, Lockable { * @return the NOOP Topic Sinks List */ List<NoopTopicSink> getNoopTopicSinks(); + } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointManager.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointManager.java index c390afc6..fb18a307 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointManager.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointManager.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,11 @@ package org.onap.policy.common.endpoints.event.comm; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NoArgsConstructor; +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class TopicEndpointManager { /** @@ -29,9 +32,4 @@ public class TopicEndpointManager { */ @Getter private static TopicEndpoint manager = new TopicEndpointProxy(); - - - private TopicEndpointManager() { - // do nothing - } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java index b86532fc..98fbbf0b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java @@ -2,7 +2,8 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2022-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,17 +24,16 @@ package org.onap.policy.common.endpoints.event.comm; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Properties; +import lombok.Getter; import org.onap.policy.common.capabilities.Startable; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicFactories; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicFactories; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicFactories; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; import org.onap.policy.common.endpoints.parameters.TopicParameters; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; @@ -44,7 +44,8 @@ import org.slf4j.LoggerFactory; * This implementation of the Topic Endpoint Manager, proxies operations to the appropriate * implementation(s). */ -class TopicEndpointProxy implements TopicEndpoint { +@Getter +public class TopicEndpointProxy implements TopicEndpoint { /** * Logger. */ @@ -70,9 +71,9 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public List<Topic> addTopics(TopicParameterGroup params) { List<TopicParameters> sinks = - (params.getTopicSinks() != null ? params.getTopicSinks() : Collections.emptyList()); + (params.getTopicSinks() != null ? params.getTopicSinks() : Collections.emptyList()); List<TopicParameters> sources = - (params.getTopicSources() != null ? params.getTopicSources() : Collections.emptyList()); + (params.getTopicSources() != null ? params.getTopicSources() : Collections.emptyList()); List<Topic> topics = new ArrayList<>(sinks.size() + sources.size()); topics.addAll(addTopicSources(sources)); @@ -86,18 +87,15 @@ class TopicEndpointProxy implements TopicEndpoint { for (TopicParameters param : paramList) { switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { - case UEB: - sources.add(UebTopicFactories.getSourceFactory().build(param)); - break; - case DMAAP: - sources.add(DmaapTopicFactories.getSourceFactory().build(param)); + case KAFKA: + sources.add(KafkaTopicFactories.getSourceFactory().build(param)); break; case NOOP: sources.add(NoopTopicFactories.getSourceFactory().build(param)); break; default: logger.debug("Unknown source type {} for topic: {}", param.getTopicCommInfrastructure(), - param.getTopic()); + param.getTopic()); break; } } @@ -110,14 +108,12 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public List<TopicSource> addTopicSources(Properties properties) { - // 1. Create UEB Sources - // 2. Create DMAAP Sources - // 3. Create NOOP Sources + // 1. Create KAFKA Sources + // 2. Create NOOP Sources List<TopicSource> sources = new ArrayList<>(); - sources.addAll(UebTopicFactories.getSourceFactory().build(properties)); - sources.addAll(DmaapTopicFactories.getSourceFactory().build(properties)); + sources.addAll(KafkaTopicFactories.getSourceFactory().build(properties)); sources.addAll(NoopTopicFactories.getSourceFactory().build(properties)); lockSources(sources); @@ -137,18 +133,15 @@ class TopicEndpointProxy implements TopicEndpoint { for (TopicParameters param : paramList) { switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { - case UEB: - sinks.add(UebTopicFactories.getSinkFactory().build(param)); - break; - case DMAAP: - sinks.add(DmaapTopicFactories.getSinkFactory().build(param)); + case KAFKA: + sinks.add(KafkaTopicFactories.getSinkFactory().build(param)); break; case NOOP: sinks.add(NoopTopicFactories.getSinkFactory().build(param)); break; default: logger.debug("Unknown sink type {} for topic: {}", param.getTopicCommInfrastructure(), - param.getTopic()); + param.getTopic()); break; } } @@ -160,14 +153,12 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public List<TopicSink> addTopicSinks(Properties properties) { - // 1. Create UEB Sinks - // 2. Create DMAAP Sinks - // 3. Create NOOP Sinks + // 1. Create KAFKA Sinks + // 2. Create NOOP Sinks final List<TopicSink> sinks = new ArrayList<>(); - sinks.addAll(UebTopicFactories.getSinkFactory().build(properties)); - sinks.addAll(DmaapTopicFactories.getSinkFactory().build(properties)); + sinks.addAll(KafkaTopicFactories.getSinkFactory().build(properties)); sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties)); lockSinks(sinks); @@ -186,8 +177,7 @@ class TopicEndpointProxy implements TopicEndpoint { final List<TopicSource> sources = new ArrayList<>(); - sources.addAll(UebTopicFactories.getSourceFactory().inventory()); - sources.addAll(DmaapTopicFactories.getSourceFactory().inventory()); + sources.addAll(KafkaTopicFactories.getSourceFactory().inventory()); sources.addAll(NoopTopicFactories.getSourceFactory().inventory()); return sources; @@ -201,34 +191,21 @@ class TopicEndpointProxy implements TopicEndpoint { } final List<TopicSource> sources = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSource uebSource = this.getUebTopicSource(topic); - if (uebSource != null) { - sources.add(uebSource); - } - } catch (final Exception e) { - logger.debug("No UEB source for topic: {}", topic, e); - } + topicNames.forEach(topic -> { try { - final TopicSource dmaapSource = this.getDmaapTopicSource(topic); - if (dmaapSource != null) { - sources.add(dmaapSource); - } + sources.add(Objects.requireNonNull(this.getKafkaTopicSource(topic))); } catch (final Exception e) { - logger.debug("No DMAAP source for topic: {}", topic, e); + logger.debug("No KAFKA source for topic: {}", topic, e); } try { - final TopicSource noopSource = this.getNoopTopicSource(topic); - if (noopSource != null) { - sources.add(noopSource); - } + sources.add(Objects.requireNonNull(this.getNoopTopicSource(topic))); } catch (final Exception e) { logger.debug("No NOOP source for topic: {}", topic, e); } - } + }); + return sources; } @@ -237,8 +214,7 @@ class TopicEndpointProxy implements TopicEndpoint { final List<TopicSink> sinks = new ArrayList<>(); - sinks.addAll(UebTopicFactories.getSinkFactory().inventory()); - sinks.addAll(DmaapTopicFactories.getSinkFactory().inventory()); + sinks.addAll(KafkaTopicFactories.getSinkFactory().inventory()); sinks.addAll(NoopTopicFactories.getSinkFactory().inventory()); return sinks; @@ -254,28 +230,13 @@ class TopicEndpointProxy implements TopicEndpoint { final List<TopicSink> sinks = new ArrayList<>(); for (final String topic : topicNames) { try { - final TopicSink uebSink = this.getUebTopicSink(topic); - if (uebSink != null) { - sinks.add(uebSink); - } - } catch (final Exception e) { - logger.debug("No UEB sink for topic: {}", topic, e); - } - - try { - final TopicSink dmaapSink = this.getDmaapTopicSink(topic); - if (dmaapSink != null) { - sinks.add(dmaapSink); - } + sinks.add(Objects.requireNonNull(this.getKafkaTopicSink(topic))); } catch (final Exception e) { - logger.debug("No DMAAP sink for topic: {}", topic, e); + logger.debug("No KAFKA sink for topic: {}", topic, e); } try { - final TopicSink noopSink = this.getNoopTopicSink(topic); - if (noopSink != null) { - sinks.add(noopSink); - } + sinks.add(Objects.requireNonNull(this.getNoopTopicSink(topic))); } catch (final Exception e) { logger.debug("No NOOP sink for topic: {}", topic, e); } @@ -286,19 +247,13 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public List<TopicSink> getTopicSinks(String topicName) { if (topicName == null) { - throw parmException(null); + throw paramException(null); } final List<TopicSink> sinks = new ArrayList<>(); try { - sinks.add(this.getUebTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getDmaapTopicSink(topicName)); + sinks.add(this.getKafkaTopicSink(topicName)); } catch (final Exception e) { logNoSink(topicName, e); } @@ -314,14 +269,8 @@ class TopicEndpointProxy implements TopicEndpoint { @GsonJsonIgnore @Override - public List<UebTopicSource> getUebTopicSources() { - return UebTopicFactories.getSourceFactory().inventory(); - } - - @GsonJsonIgnore - @Override - public List<DmaapTopicSource> getDmaapTopicSources() { - return DmaapTopicFactories.getSourceFactory().inventory(); + public List<KafkaTopicSource> getKafkaTopicSources() { + return KafkaTopicFactories.getSourceFactory().inventory(); } @GsonJsonIgnore @@ -330,16 +279,10 @@ class TopicEndpointProxy implements TopicEndpoint { return NoopTopicFactories.getSourceFactory().inventory(); } - @GsonJsonIgnore @Override - public List<UebTopicSink> getUebTopicSinks() { - return UebTopicFactories.getSinkFactory().inventory(); - } - @GsonJsonIgnore - @Override - public List<DmaapTopicSink> getDmaapTopicSinks() { - return DmaapTopicFactories.getSinkFactory().inventory(); + public List<KafkaTopicSink> getKafkaTopicSinks() { + return KafkaTopicFactories.getSinkFactory().inventory(); } @GsonJsonIgnore @@ -365,7 +308,7 @@ class TopicEndpointProxy implements TopicEndpoint { final List<Startable> endpoints = this.getEndpoints(); - boolean success = true; + var success = true; for (final Startable endpoint : endpoints) { try { success = endpoint.start() && success; @@ -391,7 +334,7 @@ class TopicEndpointProxy implements TopicEndpoint { final List<Startable> endpoints = this.getEndpoints(); - boolean success = true; + var success = true; for (final Startable endpoint : endpoints) { try { success = endpoint.stop() && success; @@ -423,11 +366,8 @@ class TopicEndpointProxy implements TopicEndpoint { public void shutdown() { this.stop(); - UebTopicFactories.getSourceFactory().destroy(); - UebTopicFactories.getSinkFactory().destroy(); - - DmaapTopicFactories.getSourceFactory().destroy(); - DmaapTopicFactories.getSinkFactory().destroy(); + KafkaTopicFactories.getSourceFactory().destroy(); + KafkaTopicFactories.getSinkFactory().destroy(); NoopTopicFactories.getSinkFactory().destroy(); NoopTopicFactories.getSourceFactory().destroy(); @@ -435,11 +375,6 @@ class TopicEndpointProxy implements TopicEndpoint { } @Override - public boolean isAlive() { - return this.alive; - } - - @Override public boolean lock() { boolean shouldLock; @@ -484,68 +419,43 @@ class TopicEndpointProxy implements TopicEndpoint { } @Override - public boolean isLocked() { - return this.locked; - } - - @Override public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) { if (commType == null) { - throw parmException(topicName); + throw paramException(topicName); } if (topicName == null) { - throw parmException(null); + throw paramException(null); } - switch (commType) { - case UEB: - return this.getUebTopicSource(topicName); - case DMAAP: - return this.getDmaapTopicSource(topicName); - case NOOP: - return this.getNoopTopicSource(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } + return switch (commType) { + case KAFKA -> this.getKafkaTopicSource(topicName); + case NOOP -> this.getNoopTopicSource(topicName); + default -> throw new UnsupportedOperationException("Unsupported " + commType.name()); + }; } @Override public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) { if (commType == null) { - throw parmException(topicName); + throw paramException(topicName); } if (topicName == null) { - throw parmException(null); - } - - switch (commType) { - case UEB: - return this.getUebTopicSink(topicName); - case DMAAP: - return this.getDmaapTopicSink(topicName); - case NOOP: - return this.getNoopTopicSink(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); + throw paramException(null); } - } - @Override - public UebTopicSource getUebTopicSource(String topicName) { - return UebTopicFactories.getSourceFactory().get(topicName); - } - - @Override - public UebTopicSink getUebTopicSink(String topicName) { - return UebTopicFactories.getSinkFactory().get(topicName); + return switch (commType) { + case KAFKA -> this.getKafkaTopicSink(topicName); + case NOOP -> this.getNoopTopicSink(topicName); + default -> throw new UnsupportedOperationException("Unsupported " + commType.name()); + }; } @Override - public DmaapTopicSource getDmaapTopicSource(String topicName) { - return DmaapTopicFactories.getSourceFactory().get(topicName); + public KafkaTopicSource getKafkaTopicSource(String topicName) { + return KafkaTopicFactories.getSourceFactory().get(topicName); } @Override @@ -554,8 +464,8 @@ class TopicEndpointProxy implements TopicEndpoint { } @Override - public DmaapTopicSink getDmaapTopicSink(String topicName) { - return DmaapTopicFactories.getSinkFactory().get(topicName); + public KafkaTopicSink getKafkaTopicSink(String topicName) { + return KafkaTopicFactories.getSinkFactory().get(topicName); } @Override @@ -563,7 +473,7 @@ class TopicEndpointProxy implements TopicEndpoint { return NoopTopicFactories.getSinkFactory().get(topicName); } - private IllegalArgumentException parmException(String topicName) { + private IllegalArgumentException paramException(String topicName) { return new IllegalArgumentException( "Invalid parameter: a communication infrastructure required to fetch " + topicName); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java index f36dfa31..b67756e5 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,9 +33,9 @@ public interface TopicSink extends Topic { * * @return true if the send operation succeeded, false otherwise * @throws IllegalArgumentException an invalid message has been provided - * @throws IllegalStateException the entity is in an state that prevents + * @throws IllegalStateException the entity is in a state that prevents * it from sending messages, for example, locked or stopped. */ - public boolean send(String message); + boolean send(String message); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java index e77beea1..ceb9255e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,21 +24,21 @@ package org.onap.policy.common.endpoints.event.comm.bus; import org.onap.policy.common.endpoints.event.comm.TopicSink; /** - * Topic Sink over Bus Infrastructure (DMAAP/UEB). + * Topic Sink over Bus Infrastructure (KAFKA). */ public interface BusTopicSink extends ApiKeyEnabled, TopicSink { /** - * Sets the UEB partition key for published messages. + * Sets the partition key for published messages. * * @param partitionKey the partition key */ - public void setPartitionKey(String partitionKey); + void setPartitionKey(String partitionKey); /** * Return the partition key in used by the system to publish messages. * * @return the partition key */ - public String getPartitionKey(); + String getPartitionKey(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java index cd9bc015..87a06824 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +24,7 @@ package org.onap.policy.common.endpoints.event.comm.bus; import org.onap.policy.common.endpoints.event.comm.TopicSource; /** - * Generic Topic Source for UEB/DMAAP Communication Infrastructure. + * Generic Topic Source for Bus Communication Infrastructure. * */ public interface BusTopicSource extends ApiKeyEnabled, TopicSource { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java deleted file mode 100644 index d5a46f8f..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import lombok.Getter; - -public class DmaapTopicFactories { - - /** - * Factory for instantiation and management of sinks. - */ - @Getter - private static final DmaapTopicSinkFactory sinkFactory = new IndexedDmaapTopicSinkFactory(); - - /** - * Factory for instantiation and management of sources. - */ - @Getter - private static final DmaapTopicSourceFactory sourceFactory = new IndexedDmaapTopicSourceFactory(); - - - private DmaapTopicFactories() { - // do nothing - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java deleted file mode 100644 index 4409e827..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import java.util.List; -import java.util.Properties; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; - -/** - * DMAAP Topic Sink Factory. - */ -public interface DmaapTopicSinkFactory { - - /** - * <pre> - * Instantiate a new DMAAP Topic Sink, with following params. - * servers list of servers - * topic topic name - * apiKey API Key - * apiSecret API Secret - * userName AAF user name - * password AAF password - * partitionKey Consumer Group - * environment DME2 environment - * aftEnvironment DME2 AFT environment - * partner DME2 Partner - * latitude DME2 latitude - * longitude DME2 longitude - * additionalProps additional properties to pass to DME2 - * managed is this sink endpoint managed? - * </pre> - * @param busTopicParams parameter object - * @return DmaapTopicSink object - * @throws IllegalArgumentException if invalid parameters are present - */ - DmaapTopicSink build(BusTopicParams busTopicParams); - - /** - * Creates an DMAAP Topic Sink based on properties files. - * - * @param properties Properties containing initialization values - * @return an DMAAP Topic Sink - * @throws IllegalArgumentException if invalid parameters are present - */ - List<DmaapTopicSink> build(Properties properties); - - /** - * Instantiates a new DMAAP Topic Sink. - * - * @param servers list of servers - * @param topic topic name - * @return an DMAAP Topic Sink - * @throws IllegalArgumentException if invalid parameters are present - */ - DmaapTopicSink build(List<String> servers, String topic); - - /** - * Destroys an DMAAP Topic Sink based on a topic. - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - void destroy(String topic); - - /** - * Destroys all DMAAP Topic Sinks. - */ - void destroy(); - - /** - * Gets an DMAAP Topic Sink based on topic name. - * - * @param topic the topic name - * @return an DMAAP Topic Sink with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state - */ - DmaapTopicSink get(String topic); - - /** - * Provides a snapshot of the DMAAP Topic Sinks. - * - * @return a list of the DMAAP Topic Sinks - */ - List<DmaapTopicSink> inventory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java deleted file mode 100644 index 6f0753b3..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; -import org.apache.commons.lang3.StringUtils; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils; -import org.onap.policy.common.endpoints.utils.PropertyUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory of DMAAP Reader Topics indexed by topic name. - */ -class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { - - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class); - - /** - * DMAAP Topic Name Index. - */ - protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>(); - - @Override - public DmaapTopicSink build(BusTopicParams busTopicParams) { - - if (StringUtils.isBlank(busTopicParams.getTopic())) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) { - return dmaapTopicWriters.get(busTopicParams.getTopic()); - } - - DmaapTopicSink dmaapTopicSink = makeSink(busTopicParams); - - if (busTopicParams.isManaged()) { - dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink); - } - return dmaapTopicSink; - } - } - - @Override - public DmaapTopicSink build(List<String> servers, String topic) { - return this.build(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .managed(true) - .useHttps(false) - .allowSelfSignedCerts(false) - .build()); - } - - @Override - public List<DmaapTopicSink> build(Properties properties) { - - String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS); - if (StringUtils.isBlank(writeTopics)) { - logger.info("{}: no topic for DMaaP Sink", this); - return new ArrayList<>(); - } - - List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>(); - synchronized (this) { - for (String topic : writeTopics.split("\\s*,\\s*")) { - addTopic(newDmaapTopicSinks, properties, topic); - } - return newDmaapTopicSinks; - } - } - - private void addTopic(List<DmaapTopicSink> newDmaapTopicSinks, Properties properties, String topic) { - if (this.dmaapTopicWriters.containsKey(topic)) { - newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic)); - return; - } - - String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic; - - PropertyUtils props = new PropertyUtils(properties, topicPrefix, - (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); - - String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (StringUtils.isBlank(servers)) { - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - return; - } - - DmaapTopicSink dmaapTopicSink = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers) - .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null)) - .build()); - - newDmaapTopicSinks.add(dmaapTopicSink); - } - - /** - * Makes a new sink. - * - * @param busTopicParams parameters to use to configure the sink - * @return a new sink - */ - protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) { - return new InlineDmaapTopicSink(busTopicParams); - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - DmaapTopicSink dmaapTopicWriter; - synchronized (this) { - if (!dmaapTopicWriters.containsKey(topic)) { - return; - } - - dmaapTopicWriter = dmaapTopicWriters.remove(topic); - } - - dmaapTopicWriter.shutdown(); - } - - @Override - public void destroy() { - List<DmaapTopicSink> writers = this.inventory(); - for (DmaapTopicSink writer : writers) { - writer.shutdown(); - } - - synchronized (this) { - this.dmaapTopicWriters.clear(); - } - } - - @Override - public DmaapTopicSink get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicWriters.containsKey(topic)) { - return dmaapTopicWriters.get(topic); - } else { - throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); - } - } - } - - @Override - public synchronized List<DmaapTopicSink> inventory() { - return new ArrayList<>(this.dmaapTopicWriters.values()); - } - - @Override - public String toString() { - return "IndexedDmaapTopicSinkFactory []"; - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java deleted file mode 100644 index d7f4695e..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; -import org.apache.commons.lang3.StringUtils; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils; -import org.onap.policy.common.endpoints.utils.PropertyUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory of DMAAP Source Topics indexed by topic name. - */ - -class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class); - - /** - * DMaaP Topic Name Index. - */ - protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>(); - - @Override - public DmaapTopicSource build(BusTopicParams busTopicParams) { - - if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) { - return dmaapTopicSources.get(busTopicParams.getTopic()); - } - - DmaapTopicSource dmaapTopicSource = makeSource(busTopicParams); - - if (busTopicParams.isManaged()) { - dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource); - } - return dmaapTopicSource; - } - } - - @Override - public List<DmaapTopicSource> build(Properties properties) { - - String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS); - if (StringUtils.isBlank(readTopics)) { - logger.info("{}: no topic for DMaaP Source", this); - return new ArrayList<>(); - } - - List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>(); - synchronized (this) { - for (String topic : readTopics.split("\\s*,\\s*")) { - addTopic(dmaapTopicSourceLst, properties, topic); - } - } - return dmaapTopicSourceLst; - } - - @Override - public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) { - return this.build(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH) - .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH) - .managed(true) - .useHttps(false) - .allowSelfSignedCerts(false) - .build()); - } - - @Override - public DmaapTopicSource build(List<String> servers, String topic) { - return this.build(servers, topic, null, null); - } - - private void addTopic(List<DmaapTopicSource> dmaapTopicSourceLst, Properties properties, String topic) { - if (this.dmaapTopicSources.containsKey(topic)) { - dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic)); - return; - } - - String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic; - - PropertyUtils props = new PropertyUtils(properties, topicPrefix, - (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); - - String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (StringUtils.isBlank(servers)) { - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - return; - } - - DmaapTopicSource uebTopicSource = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers) - .consumerGroup(props.getString( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null)) - .consumerInstance(props.getString( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null)) - .fetchTimeout(props.getInteger( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, - PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)) - .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, - PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)) - .build()); - - dmaapTopicSourceLst.add(uebTopicSource); - } - - /** - * Makes a new source. - * - * @param busTopicParams parameters to use to configure the source - * @return a new source - */ - protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) { - return new SingleThreadedDmaapTopicSource(busTopicParams); - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - DmaapTopicSource uebTopicSource; - - synchronized (this) { - if (!dmaapTopicSources.containsKey(topic)) { - return; - } - - uebTopicSource = dmaapTopicSources.remove(topic); - } - - uebTopicSource.shutdown(); - } - - @Override - public void destroy() { - List<DmaapTopicSource> readers = this.inventory(); - for (DmaapTopicSource reader : readers) { - reader.shutdown(); - } - - synchronized (this) { - this.dmaapTopicSources.clear(); - } - } - - @Override - public DmaapTopicSource get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicSources.containsKey(topic)) { - return dmaapTopicSources.get(topic); - } else { - throw new IllegalStateException("DmaapTopiceSource for " + topic + " not found"); - } - } - } - - @Override - public synchronized List<DmaapTopicSource> inventory() { - return new ArrayList<>(this.dmaapTopicSources.values()); - } - - @Override - public String toString() { - return "IndexedDmaapTopicSourceFactory []"; - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java index 150a02c0..f913926e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java @@ -1,8 +1,6 @@ /* * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,40 +18,42 @@ package org.onap.policy.common.endpoints.event.comm.bus; +import com.google.re2j.Pattern; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Properties; import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineKafkaTopicSink; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.KafkaPropertyUtils; import org.onap.policy.common.endpoints.utils.PropertyUtils; -import org.onap.policy.common.endpoints.utils.UebPropertyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Factory of UEB Reader Topics indexed by topic name. + * Factory of KAFKA Reader Topics indexed by topic name. */ -class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { +class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory { + private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); private static final String MISSING_TOPIC = "A topic must be provided"; /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); + private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class); /** - * UEB Topic Name Index. + * KAFKA Topic Name Index. */ - protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>(); + protected HashMap<String, KafkaTopicSink> kafkaTopicSinks = new HashMap<>(); @Override - public UebTopicSink build(BusTopicParams busTopicParams) { + public KafkaTopicSink build(BusTopicParams busTopicParams) { if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { - throw new IllegalArgumentException("UEB Server(s) must be provided"); + throw new IllegalArgumentException("KAFKA Server(s) must be provided"); } if (StringUtils.isBlank(busTopicParams.getTopic())) { @@ -61,72 +61,71 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { } synchronized (this) { - if (uebTopicSinks.containsKey(busTopicParams.getTopic())) { - return uebTopicSinks.get(busTopicParams.getTopic()); + if (kafkaTopicSinks.containsKey(busTopicParams.getTopic())) { + return kafkaTopicSinks.get(busTopicParams.getTopic()); } - UebTopicSink uebTopicWriter = makeSink(busTopicParams); - + KafkaTopicSink kafkaTopicWriter = makeSink(busTopicParams); if (busTopicParams.isManaged()) { - uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter); + kafkaTopicSinks.put(busTopicParams.getTopic(), kafkaTopicWriter); } - return uebTopicWriter; + return kafkaTopicWriter; } } @Override - public UebTopicSink build(List<String> servers, String topic) { + public KafkaTopicSink build(List<String> servers, String topic) { return this.build(BusTopicParams.builder() .servers(servers) .topic(topic) .managed(true) .useHttps(false) - .allowSelfSignedCerts(false) .build()); } @Override - public List<UebTopicSink> build(Properties properties) { + public List<KafkaTopicSink> build(Properties properties) { - String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS); + String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS); if (StringUtils.isBlank(writeTopics)) { - logger.info("{}: no topic for UEB Sink", this); + logger.info("{}: no topic for KAFKA Sink", this); return new ArrayList<>(); } - List<UebTopicSink> newUebTopicSinks = new ArrayList<>(); + List<KafkaTopicSink> newKafkaTopicSinks = new ArrayList<>(); synchronized (this) { - for (String topic : writeTopics.split("\\s*,\\s*")) { - addTopic(newUebTopicSinks, topic, properties); + for (String topic : COMMA_SPACE_PAT.split(writeTopics)) { + addTopic(newKafkaTopicSinks, topic.toLowerCase(), properties); } - return newUebTopicSinks; + return newKafkaTopicSinks; } } - private void addTopic(List<UebTopicSink> newUebTopicSinks, String topic, Properties properties) { - if (this.uebTopicSinks.containsKey(topic)) { - newUebTopicSinks.add(this.uebTopicSinks.get(topic)); + private void addTopic(List<KafkaTopicSink> newKafkaTopicSinks, String topic, Properties properties) { + if (this.kafkaTopicSinks.containsKey(topic)) { + newKafkaTopicSinks.add(this.kafkaTopicSinks.get(topic)); return; } - String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic; + String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic; - PropertyUtils props = new PropertyUtils(properties, topicPrefix, - (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + var props = new PropertyUtils(properties, topicPrefix, + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic sink {} ", + this, name, value, topic)); String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); if (StringUtils.isBlank(servers)) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); + logger.error("{}: no KAFKA servers configured for sink {}", this, topic); return; } - UebTopicSink uebTopicWriter = this.build(UebPropertyUtils.makeBuilder(props, topic, servers) + KafkaTopicSink kafkaTopicWriter = this.build(KafkaPropertyUtils.makeBuilder(props, topic, servers) .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null)) .build()); - newUebTopicSinks.add(uebTopicWriter); + newKafkaTopicSinks.add(kafkaTopicWriter); } @Override @@ -136,49 +135,49 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { throw new IllegalArgumentException(MISSING_TOPIC); } - UebTopicSink uebTopicWriter; + KafkaTopicSink kafkaTopicWriter; synchronized (this) { - if (!uebTopicSinks.containsKey(topic)) { + if (!kafkaTopicSinks.containsKey(topic)) { return; } - uebTopicWriter = uebTopicSinks.remove(topic); + kafkaTopicWriter = kafkaTopicSinks.remove(topic); } - uebTopicWriter.shutdown(); + kafkaTopicWriter.shutdown(); } @Override public void destroy() { - List<UebTopicSink> writers = this.inventory(); - for (UebTopicSink writer : writers) { + List<KafkaTopicSink> writers = this.inventory(); + for (KafkaTopicSink writer : writers) { writer.shutdown(); } synchronized (this) { - this.uebTopicSinks.clear(); + this.kafkaTopicSinks.clear(); } } @Override - public UebTopicSink get(String topic) { + public KafkaTopicSink get(String topic) { if (topic == null || topic.isEmpty()) { throw new IllegalArgumentException(MISSING_TOPIC); } synchronized (this) { - if (uebTopicSinks.containsKey(topic)) { - return uebTopicSinks.get(topic); + if (kafkaTopicSinks.containsKey(topic)) { + return kafkaTopicSinks.get(topic); } else { - throw new IllegalStateException("UebTopicSink for " + topic + " not found"); + throw new IllegalStateException("KafkaTopicSink for " + topic + " not found"); } } } @Override - public synchronized List<UebTopicSink> inventory() { - return new ArrayList<>(this.uebTopicSinks.values()); + public synchronized List<KafkaTopicSink> inventory() { + return new ArrayList<>(this.kafkaTopicSinks.values()); } /** @@ -187,16 +186,14 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { * @param busTopicParams parameters to use to configure the sink * @return a new sink */ - protected UebTopicSink makeSink(BusTopicParams busTopicParams) { - return new InlineUebTopicSink(busTopicParams); + protected KafkaTopicSink makeSink(BusTopicParams busTopicParams) { + return new InlineKafkaTopicSink(busTopicParams); } @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("IndexedUebTopicSinkFactory []"); - return builder.toString(); + return "IndexedKafkaTopicSinkFactory " + kafkaTopicSinks.keySet(); } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java new file mode 100644 index 00000000..151d8f69 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java @@ -0,0 +1,204 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022-2023 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import com.google.re2j.Pattern; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedKafkaTopicSource; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.KafkaPropertyUtils; +import org.onap.policy.common.endpoints.utils.PropertyUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory of KAFKA Source Topics indexed by topic name. + */ +class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory { + private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); + private static final String MISSING_TOPIC = "A topic must be provided"; + + /** + * Logger. + */ + private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class); + + /** + * KAFKA Topic Name Index. + */ + protected HashMap<String, KafkaTopicSource> kafkaTopicSources = new HashMap<>(); + + @Override + public KafkaTopicSource build(BusTopicParams busTopicParams) { + if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { + throw new IllegalArgumentException("KAFKA Server(s) must be provided"); + } + + if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (kafkaTopicSources.containsKey(busTopicParams.getTopic())) { + return kafkaTopicSources.get(busTopicParams.getTopic()); + } + + var kafkaTopicSource = makeSource(busTopicParams); + + kafkaTopicSources.put(busTopicParams.getTopic(), kafkaTopicSource); + + return kafkaTopicSource; + } + } + + @Override + public List<KafkaTopicSource> build(Properties properties) { + + String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS); + if (StringUtils.isBlank(readTopics)) { + logger.info("{}: no topic for KAFKA Source", this); + return new ArrayList<>(); + } + + List<KafkaTopicSource> newKafkaTopicSources = new ArrayList<>(); + synchronized (this) { + for (String topic : COMMA_SPACE_PAT.split(readTopics)) { + addTopic(newKafkaTopicSources, topic.toLowerCase(), properties); + } + } + return newKafkaTopicSources; + } + + @Override + public KafkaTopicSource build(List<String> servers, String topic) { + return this.build(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .managed(true) + .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH) + .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH) + .useHttps(false).build()); + } + + private void addTopic(List<KafkaTopicSource> newKafkaTopicSources, String topic, Properties properties) { + if (this.kafkaTopicSources.containsKey(topic)) { + newKafkaTopicSources.add(this.kafkaTopicSources.get(topic)); + return; + } + + String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic; + + var props = new PropertyUtils(properties, topicPrefix, + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic source {} ", + this, name, value, topic)); + + String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (StringUtils.isBlank(servers)) { + logger.error("{}: no KAFKA servers configured for source {}", this, topic); + return; + } + + var kafkaTopicSource = this.build(KafkaPropertyUtils.makeBuilder(props, topic, servers) + .consumerGroup(props.getString( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null)) + .consumerInstance(props.getString( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null)) + .fetchTimeout(props.getInteger( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, + PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)) + .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, + PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)) + .build()); + + newKafkaTopicSources.add(kafkaTopicSource); + } + + /** + * Makes a new source. + * + * @param busTopicParams parameters to use to configure the source + * @return a new source + */ + protected KafkaTopicSource makeSource(BusTopicParams busTopicParams) { + return new SingleThreadedKafkaTopicSource(busTopicParams); + } + + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + KafkaTopicSource kafkaTopicSource; + + synchronized (this) { + if (!kafkaTopicSources.containsKey(topic)) { + return; + } + + kafkaTopicSource = kafkaTopicSources.remove(topic); + } + + kafkaTopicSource.shutdown(); + } + + @Override + public void destroy() { + List<KafkaTopicSource> readers = this.inventory(); + for (KafkaTopicSource reader : readers) { + reader.shutdown(); + } + + synchronized (this) { + this.kafkaTopicSources.clear(); + } + } + + @Override + public KafkaTopicSource get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (kafkaTopicSources.containsKey(topic)) { + return kafkaTopicSources.get(topic); + } else { + throw new IllegalStateException("KafkaTopiceSource for " + topic + " not found"); + } + } + } + + @Override + public synchronized List<KafkaTopicSource> inventory() { + return new ArrayList<>(this.kafkaTopicSources.values()); + } + + @Override + public String toString() { + return "IndexedKafkaTopicSourceFactory " + kafkaTopicSources.keySet(); + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java deleted file mode 100644 index 5bdc8ab6..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; -import org.apache.commons.lang3.StringUtils; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.onap.policy.common.endpoints.utils.PropertyUtils; -import org.onap.policy.common.endpoints.utils.UebPropertyUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory of UEB Source Topics indexed by topic name. - */ -class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class); - - /** - * UEB Topic Name Index. - */ - protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>(); - - @Override - public UebTopicSource build(BusTopicParams busTopicParams) { - if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { - throw new IllegalArgumentException("UEB Server(s) must be provided"); - } - - if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSources.containsKey(busTopicParams.getTopic())) { - return uebTopicSources.get(busTopicParams.getTopic()); - } - - UebTopicSource uebTopicSource = makeSource(busTopicParams); - - if (busTopicParams.isManaged()) { - uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource); - } - - return uebTopicSource; - } - } - - @Override - public List<UebTopicSource> build(Properties properties) { - - String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS); - if (StringUtils.isBlank(readTopics)) { - logger.info("{}: no topic for UEB Source", this); - return new ArrayList<>(); - } - - List<UebTopicSource> newUebTopicSources = new ArrayList<>(); - synchronized (this) { - for (String topic : readTopics.split("\\s*,\\s*")) { - addTopic(newUebTopicSources, topic, properties); - } - } - return newUebTopicSources; - } - - @Override - public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) { - - return this.build(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH) - .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH) - .managed(true) - .useHttps(false) - .allowSelfSignedCerts(true).build()); - } - - @Override - public UebTopicSource build(List<String> servers, String topic) { - return this.build(servers, topic, null, null); - } - - private void addTopic(List<UebTopicSource> newUebTopicSources, String topic, Properties properties) { - if (this.uebTopicSources.containsKey(topic)) { - newUebTopicSources.add(this.uebTopicSources.get(topic)); - return; - } - - String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic; - - PropertyUtils props = new PropertyUtils(properties, topicPrefix, - (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); - - String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (StringUtils.isBlank(servers)) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - return; - } - - UebTopicSource uebTopicSource = this.build(UebPropertyUtils.makeBuilder(props, topic, servers) - .consumerGroup(props.getString( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null)) - .consumerInstance(props.getString( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null)) - .fetchTimeout(props.getInteger( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, - PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)) - .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, - PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)) - .build()); - - newUebTopicSources.add(uebTopicSource); - } - - /** - * Makes a new source. - * - * @param busTopicParams parameters to use to configure the source - * @return a new source - */ - protected UebTopicSource makeSource(BusTopicParams busTopicParams) { - return new SingleThreadedUebTopicSource(busTopicParams); - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - UebTopicSource uebTopicSource; - - synchronized (this) { - if (!uebTopicSources.containsKey(topic)) { - return; - } - - uebTopicSource = uebTopicSources.remove(topic); - } - - uebTopicSource.shutdown(); - } - - @Override - public void destroy() { - List<UebTopicSource> readers = this.inventory(); - for (UebTopicSource reader : readers) { - reader.shutdown(); - } - - synchronized (this) { - this.uebTopicSources.clear(); - } - } - - @Override - public UebTopicSource get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSources.containsKey(topic)) { - return uebTopicSources.get(topic); - } else { - throw new IllegalStateException("UebTopiceSource for " + topic + " not found"); - } - } - } - - @Override - public synchronized List<UebTopicSource> inventory() { - return new ArrayList<>(this.uebTopicSources.values()); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("IndexedUebTopicSourceFactory []"); - return builder.toString(); - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactories.java index d02758be..60db3857 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactories.java @@ -1,8 +1,6 @@ /* * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,24 +18,22 @@ package org.onap.policy.common.endpoints.event.comm.bus; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NoArgsConstructor; -public class UebTopicFactories { +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class KafkaTopicFactories { /** * Factory for instantiation and management of sinks. */ @Getter - private static final UebTopicSinkFactory sinkFactory = new IndexedUebTopicSinkFactory(); + private static final KafkaTopicSinkFactory sinkFactory = new IndexedKafkaTopicSinkFactory(); /** * Factory for instantiation and management of sources. */ @Getter - private static final UebTopicSourceFactory sourceFactory = new IndexedUebTopicSourceFactory(); - - - private UebTopicFactories() { - // do nothing - } + private static final KafkaTopicSourceFactory sourceFactory = new IndexedKafkaTopicSourceFactory(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSink.java index 805ed108..960a02c5 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSink.java @@ -1,8 +1,6 @@ -/*- +/* * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +18,9 @@ package org.onap.policy.common.endpoints.event.comm.bus; -public interface DmaapTopicSink extends BusTopicSink { +/** + * Topic Writer over KAFKA Infrastructure. + */ +public interface KafkaTopicSink extends BusTopicSink { } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactory.java index 0cf095fd..fa5e56f9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactory.java @@ -1,16 +1,13 @@ /* * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,67 +23,67 @@ import java.util.Properties; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; /** - * UEB Topic Sink Factory. + * KAFKA Topic Sink Factory. */ -public interface UebTopicSinkFactory { +public interface KafkaTopicSinkFactory { /** - * Instantiates a new UEB Topic Writer. - * + * Instantiates a new KAFKA Topic Writer. + * * @param busTopicParams parameters object - * @return an UEB Topic Sink + * @return an KAFKA Topic Sink */ - UebTopicSink build(BusTopicParams busTopicParams); + KafkaTopicSink build(BusTopicParams busTopicParams); /** - * Creates an UEB Topic Writer based on properties files. - * + * Creates an KAFKA Topic Writer based on properties files. + * * @param properties Properties containing initialization values - * - * @return an UEB Topic Writer + * + * @return an KAFKA Topic Writer * @throws IllegalArgumentException if invalid parameters are present */ - List<UebTopicSink> build(Properties properties); + List<KafkaTopicSink> build(Properties properties); /** - * Instantiates a new UEB Topic Writer. - * + * Instantiates a new KAFKA Topic Writer. + * * @param servers list of servers * @param topic topic name - * - * @return an UEB Topic Writer + * + * @return an KAFKA Topic Writer * @throws IllegalArgumentException if invalid parameters are present */ - UebTopicSink build(List<String> servers, String topic); + KafkaTopicSink build(List<String> servers, String topic); /** - * Destroys an UEB Topic Writer based on a topic. - * + * Destroys an KAFKA Topic Writer based on a topic. + * * @param topic topic name * @throws IllegalArgumentException if invalid parameters are present */ void destroy(String topic); /** - * Destroys all UEB Topic Writers. + * Destroys all KAFKA Topic Writers. */ void destroy(); /** - * gets an UEB Topic Writer based on topic name. - * + * gets an KAFKA Topic Writer based on topic name. + * * @param topic the topic name - * - * @return an UEB Topic Writer with topic name + * + * @return an KAFKA Topic Writer with topic name * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the UEB Topic Reader is an incorrect state + * @throws IllegalStateException if the KAFKA Topic Reader is an incorrect state */ - UebTopicSink get(String topic); + KafkaTopicSink get(String topic); /** - * Provides a snapshot of the UEB Topic Writers. - * - * @return a list of the UEB Topic Writers + * Provides a snapshot of the KAFKA Topic Writers. + * + * @return a list of the KAFKA Topic Writers */ - List<UebTopicSink> inventory(); + List<KafkaTopicSink> inventory(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSource.java index 9893fa15..03efd083 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSource.java @@ -1,8 +1,6 @@ -/*- +/* * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +18,9 @@ package org.onap.policy.common.endpoints.event.comm.bus; -public interface DmaapTopicSource extends BusTopicSource { +/** + * Kafka Topic Source. + */ +public interface KafkaTopicSource extends BusTopicSource { -} +}
\ No newline at end of file diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java index 7b1f185b..e5642daa 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java @@ -1,9 +1,6 @@ /* * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,54 +23,41 @@ import java.util.Properties; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; /** - * DMAAP Topic Source Factory. + * Kafka Topic Source Factory. */ -public interface DmaapTopicSourceFactory { +public interface KafkaTopicSourceFactory { /** - * Creates an DMAAP Topic Source based on properties files. + * Creates a Kafka Topic Source based on properties files. * * @param properties Properties containing initialization values * - * @return an DMAAP Topic Source + * @return a Kafka Topic Source * @throws IllegalArgumentException if invalid parameters are present */ - List<DmaapTopicSource> build(Properties properties); + List<KafkaTopicSource> build(Properties properties); /** - * Instantiates a new DMAAP Topic Source. + * Instantiates a new Kafka Topic Source. * * @param busTopicParams parameters object - * @return a DMAAP Topic Source - */ - DmaapTopicSource build(BusTopicParams busTopicParams); - - /** - * Instantiates a new DMAAP Topic Source. - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * - * @return an DMAAP Topic Source - * @throws IllegalArgumentException if invalid parameters are present + * @return a Kafka Topic Source */ - DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret); + KafkaTopicSource build(BusTopicParams busTopicParams); /** - * Instantiates a new DMAAP Topic Source. + * Instantiates a new Kafka Topic Source. * * @param servers list of servers * @param topic topic name * - * @return an DMAAP Topic Source + * @return a Kafka Topic Source * @throws IllegalArgumentException if invalid parameters are present */ - DmaapTopicSource build(List<String> servers, String topic); + KafkaTopicSource build(List<String> servers, String topic); /** - * Destroys an DMAAP Topic Source based on a topic. + * Destroys a Kafka Topic Source based on a topic. * * @param topic topic name * @throws IllegalArgumentException if invalid parameters are present @@ -81,24 +65,24 @@ public interface DmaapTopicSourceFactory { void destroy(String topic); /** - * Destroys all DMAAP Topic Sources. + * Destroys all Kafka Topic Sources. */ void destroy(); /** - * Gets an DMAAP Topic Source based on topic name. + * Gets a Kafka Topic Source based on topic name. * * @param topic the topic name - * @return an DMAAP Topic Source with topic name + * @return a Kafka Topic Source with topic name * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state + * @throws IllegalStateException if the Kafka Topic Source is an incorrect state */ - DmaapTopicSource get(String topic); + KafkaTopicSource get(String topic); /** - * Provides a snapshot of the DMAAP Topic Sources. + * Provides a snapshot of the Kafka Topic Sources. * - * @return a list of the DMAAP Topic Sources + * @return a list of the Kafka Topic Sources */ - List<DmaapTopicSource> inventory(); + List<KafkaTopicSource> inventory(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpoint.java index 833574a3..73ff6ed6 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpoint.java @@ -4,6 +4,7 @@ * ================================================================================ * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2019 Samsung Electronics Co., Ltd. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +42,7 @@ public abstract class NoopTopicEndpoint extends TopicBase { /** * Constructs the object. */ - public NoopTopicEndpoint(List<String> servers, String topic) { + protected NoopTopicEndpoint(List<String> servers, String topic) { super(servers, topic); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactories.java index aa85e714..c3e7e0a4 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactories.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactories.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,11 @@ package org.onap.policy.common.endpoints.event.comm.bus; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NoArgsConstructor; +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class NoopTopicFactories { /** @@ -35,9 +38,4 @@ public class NoopTopicFactories { */ @Getter private static final NoopTopicSourceFactory sourceFactory = new NoopTopicSourceFactory(); - - - private NoopTopicFactories() { - // do nothing - } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactory.java index 98b6ed6f..dfe7c2c4 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactory.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ package org.onap.policy.common.endpoints.event.comm.bus; +import com.google.re2j.Pattern; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -32,6 +33,7 @@ import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; * Noop Topic Factory. */ public abstract class NoopTopicFactory<T extends NoopTopicEndpoint> extends TopicBaseHashedFactory<T> { + private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); /** * Get Topics Property Name. @@ -50,7 +52,7 @@ public abstract class NoopTopicFactory<T extends NoopTopicEndpoint> extends Topi return new ArrayList<>(); } - return Arrays.asList(topics.split("\\s*,\\s*")); + return Arrays.asList(COMMA_SPACE_PAT.split(topics)); } /** @@ -66,7 +68,7 @@ public abstract class NoopTopicFactory<T extends NoopTopicEndpoint> extends Topi servers = CommInfrastructure.NOOP.toString(); } - return new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + return new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers))); } /** @@ -74,11 +76,11 @@ public abstract class NoopTopicFactory<T extends NoopTopicEndpoint> extends Topi */ @Override protected boolean isManaged(String topicName, Properties properties) { - String managedString = + var managedString = properties.getProperty(getTopicsPropertyName() + "." + topicName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; + var managed = true; if (managedString != null && !managedString.isEmpty()) { managed = Boolean.parseBoolean(managedString); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java index f53c5ea8..c785ef04 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/TopicBaseHashedFactory.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -126,7 +126,7 @@ public abstract class TopicBaseHashedFactory<T extends Topic> implements TopicBa return this.endpoints.get(topic); } - T endpoint = build(servers, topic); + var endpoint = build(servers, topic); if (managed) { this.endpoints.put(topic, endpoint); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java deleted file mode 100644 index acfef6da..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java +++ /dev/null @@ -1,28 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -/** - * Topic Writer over UEB Infrastructure. - */ -public interface UebTopicSink extends BusTopicSink { - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java deleted file mode 100644 index 56534309..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java +++ /dev/null @@ -1,29 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -/** - * Topic Source for UEB Communication Infrastructure. - * - */ -public interface UebTopicSource extends BusTopicSource { - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java deleted file mode 100644 index beacee3b..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus; - -import java.util.List; -import java.util.Properties; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; - -/** - * UEB Topic Source Factory. - */ -public interface UebTopicSourceFactory { - - /** - * Creates an UEB Topic Source based on properties files. - * - * @param properties Properties containing initialization values - * - * @return an UEB Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - List<UebTopicSource> build(Properties properties); - - /** - * Instantiates a new UEB Topic Source. - * - * @param busTopicParams parameters object - * @return an UEB Topic Source - */ - UebTopicSource build(BusTopicParams busTopicParams); - - /** - * Instantiates a new UEB Topic Source. - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * - * @return an UEB Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret); - - /** - * Instantiates a new UEB Topic Source. - * - * @param servers list of servers - * @param topic topic name - * - * @return an UEB Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - UebTopicSource build(List<String> servers, String topic); - - /** - * Destroys an UEB Topic Source based on a topic. - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - void destroy(String topic); - - /** - * Destroys all UEB Topic Sources. - */ - void destroy(); - - /** - * Gets an UEB Topic Source based on topic name. - * - * @param topic the topic name - * @return an UEB Topic Source with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the UEB Topic Source is an incorrect state - */ - UebTopicSource get(String topic); - - /** - * Provides a snapshot of the UEB Topic Sources. - * - * @return a list of the UEB Topic Sources - */ - List<UebTopicSource> inventory(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 233434f1..b46c2715 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -2,8 +2,10 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. + * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2022-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,23 +23,31 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor; import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; -import java.util.Map; +import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; -import org.onap.dmaap.mr.client.MRClientFactory; -import org.onap.dmaap.mr.client.impl.MRConsumerImpl; -import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,126 +71,54 @@ public interface BusConsumer { public void close(); /** - * BusConsumer that supports server-side filtering. + * Consumer that handles fetch() failures by sleeping. */ - public interface FilterableBusConsumer extends BusConsumer { + abstract class FetchingBusConsumer implements BusConsumer { + private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class); /** - * Sets the server-side filter. - * - * @param filter new filter value, or {@code null} - * @throws IllegalArgumentException if the consumer cannot be built with the new filter - */ - public void setFilter(String filter); - } - - /** - * Cambria based consumer. - */ - public static class CambriaConsumerWrapper implements FilterableBusConsumer { - - /** - * logger. - */ - private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); - - /** - * Used to build the consumer. + * Fetch timeout. */ - private final ConsumerBuilder builder; - - /** - * Locked while updating {@link #consumer} and {@link #newConsumer}. - */ - private final Object consLocker = new Object(); - - /** - * Cambria client. - */ - private CambriaConsumer consumer; + protected int fetchTimeout; /** - * Cambria client to use for next fetch. + * Time to sleep on a fetch failure. */ - private CambriaConsumer newConsumer = null; + @Getter + private final int sleepTime; /** - * fetch timeout. + * Counted down when {@link #close()} is invoked. */ - protected int fetchTimeout; + private final CountDownLatch closeCondition = new CountDownLatch(1); - /** - * close condition. - */ - protected CountDownLatch closeCondition = new CountDownLatch(1); /** - * Cambria Consumer Wrapper. - * BusTopicParam object contains the following parameters - * servers messaging bus hosts. - * topic topic - * apiKey API Key - * apiSecret API Secret - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit + * Constructs the object. * - * @param busTopicParams - The parameters for the bus topic - * @throws GeneralSecurityException - Security exception - * @throws MalformedURLException - Malformed URL exception + * @param busTopicParams parameters for the bus topic */ - public CambriaConsumerWrapper(BusTopicParams busTopicParams) { - + protected FetchingBusConsumer(BusTopicParams busTopicParams) { this.fetchTimeout = busTopicParams.getFetchTimeout(); - this.builder = new CambriaClientBuilders.ConsumerBuilder(); - - builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance()) - .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()) - .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit()); - - // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) - builder.withSocketTimeout(fetchTimeout + 30000); - - if (busTopicParams.isUseHttps()) { - builder.usingHttps(); - - if (busTopicParams.isAllowSelfSignedCerts()) { - builder.allowSelfSignedCertificates(); - } - } - - if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { - builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); - } - - if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { - builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); - } - - try { - this.consumer = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public Iterable<String> fetch() throws IOException { - try { - return getCurrentConsumer().fetch(); - } catch (final IOException e) { //NOSONAR - logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), - this.fetchTimeout); - sleepAfterFetchFailure(); - throw e; + if (this.fetchTimeout <= 0) { + this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH; + } else { + // don't sleep too long, even if fetch timeout is large + this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH); } } - private void sleepAfterFetchFailure() { + /** + * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed, + * or the thread is interrupted, then this will return immediately. + */ + protected void sleepAfterFetchFailure() { try { - this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR + logger.info("{}: backoff for {}ms", this, sleepTime); + if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) { + logger.info("{}: closed while handling fetch error", this); + } } catch (InterruptedException e) { logger.warn("{}: interrupted while handling fetch error", this, e); @@ -191,371 +129,148 @@ public interface BusConsumer { @Override public void close() { this.closeCondition.countDown(); - getCurrentConsumer().close(); - } - - private CambriaConsumer getCurrentConsumer() { - CambriaConsumer old = null; - CambriaConsumer ret; - - synchronized (consLocker) { - if (this.newConsumer != null) { - // replace old consumer with new consumer - old = this.consumer; - this.consumer = this.newConsumer; - this.newConsumer = null; - } - - ret = this.consumer; - } - - if (old != null) { - old.close(); - } - - return ret; - } - - @Override - public void setFilter(String filter) { - logger.info("{}: setting DMAAP server-side filter: {}", this, filter); - builder.withServerSideFilter(filter); - - try { - CambriaConsumer previous; - synchronized (consLocker) { - previous = this.newConsumer; - this.newConsumer = builder.build(); - } - - if (previous != null) { - // there was already a new consumer - close it - previous.close(); - } - - } catch (MalformedURLException | GeneralSecurityException e) { - /* - * Since an exception occurred, "consumer" still has its old value, thus it should - * not be closed at this point. - */ - throw new IllegalArgumentException(e); - } - } - - @Override - public String toString() { - return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; } } /** - * MR based consumer. + * Kafka based consumer. */ - public abstract class DmaapConsumerWrapper implements BusConsumer { + class KafkaConsumerWrapper extends FetchingBusConsumer { /** * logger. */ - private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); - /** - * Name of the "protocol" property. - */ - protected static final String PROTOCOL_PROP = "Protocol"; + private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; /** - * fetch timeout. + * Kafka consumer. */ - protected int fetchTimeout; + protected KafkaConsumer<String, String> consumer; + protected Properties kafkaProps; - /** - * close condition. - */ - protected CountDownLatch closeCondition = new CountDownLatch(1); + protected boolean allowTracing; /** - * MR Consumer. - */ - protected MRConsumerImpl consumer; - - /** - * MR Consumer Wrapper. + * Kafka Consumer Wrapper. + * BusTopicParam - object contains the following parameters + * servers - messaging bus hosts. + * topic - topic * - * <p>servers messaging bus hosts - * topic topic - * apiKey API Key - * apiSecret API Secret - * username AAF Login - * password AAF Password - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit - * - * @param busTopicParams contains above listed attributes - * @throws MalformedURLException URL should be valid + * @param busTopicParams - The parameters for the bus topic */ - public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - - this.fetchTimeout = busTopicParams.getFetchTimeout(); + public KafkaConsumerWrapper(BusTopicParams busTopicParams) { + super(busTopicParams); if (busTopicParams.isTopicInvalid()) { - throw new IllegalArgumentException("No topic for DMaaP"); + throw new IllegalArgumentException("No topic for Kafka"); } - this.consumer = new MRConsumerImplBuilder() - .setHostPart(busTopicParams.getServers()) - .setTopic(busTopicParams.getTopic()) - .setConsumerGroup(busTopicParams.getConsumerGroup()) - .setConsumerId(busTopicParams.getConsumerInstance()) - .setTimeoutMs(busTopicParams.getFetchTimeout()) - .setLimit(busTopicParams.getFetchLimit()) - .setApiKey(busTopicParams.getApiKey()) - .setApiSecret(busTopicParams.getApiSecret()) - .createMRConsumerImpl(); - - this.consumer.setUsername(busTopicParams.getUserName()); - this.consumer.setPassword(busTopicParams.getPassword()); - } - - @Override - public Iterable<String> fetch() throws IOException { - final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); - if (response == null) { - logger.warn("{}: DMaaP NULL response received", this); - - sleepAfterFetchFailure(); - return new ArrayList<>(); - } else { - logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), - response.getResponseMessage()); - - if (!"200".equals(response.getResponseCode())) { - - logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), - response.getResponseMessage()); + //Setup Properties for consumer + kafkaProps = new Properties(); + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + busTopicParams.getServers().get(0)); - sleepAfterFetchFailure(); - - /* fall through */ - } + if (busTopicParams.isAdditionalPropsValid()) { + kafkaProps.putAll(busTopicParams.getAdditionalProps()); } - if (response.getActualMessages() == null) { - return new ArrayList<>(); - } else { - return response.getActualMessages(); + if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); } - } - - private void sleepAfterFetchFailure() { - try { - this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR - - } catch (InterruptedException e) { - logger.warn("{}: interrupted while handling fetch error", this, e); - Thread.currentThread().interrupt(); + if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); } - } - - @Override - public void close() { - this.closeCondition.countDown(); - this.consumer.close(); - } - - @Override - public String toString() { - return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() - + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" - + consumer.getUsername() + "]"; - } - } - - /** - * MR based consumer. - */ - public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); - - private final Properties props; - - /** - * BusTopicParams contain the following parameters. - * MR Consumer Wrapper. - * - * <p>servers messaging bus hosts - * topic topic - * apiKey API Key - * apiSecret API Secret - * aafLogin AAF Login - * aafPassword AAF Password - * consumerGroup Consumer Group - * consumerInstance Consumer Instance - * fetchTimeout Fetch Timeout - * fetchLimit Fetch Limit - * - * @param busTopicParams contains above listed params - * @throws MalformedURLException URL should be valid - */ - public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - - super(busTopicParams); - - // super constructor sets servers = {""} if empty to avoid errors when using DME2 - if (busTopicParams.isServersInvalid()) { - throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); + if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup()); } - - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - props = new Properties(); - - if (busTopicParams.isUseHttps()) { - props.setProperty(PROTOCOL_PROP, "https"); - this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905"); - - } else { - props.setProperty(PROTOCOL_PROP, "http"); - this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904"); + if (busTopicParams.isAllowTracing()) { + this.allowTracing = true; + kafkaProps.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingConsumerInterceptor.class.getName()); } - this.consumer.setProps(props); - logger.info("{}: CREATION", this); + consumer = new KafkaConsumer<>(kafkaProps); + //Subscribe to the topic + consumer.subscribe(List.of(busTopicParams.getTopic())); } @Override - public String toString() { - final MRConsumerImpl consumer = this.consumer; - - return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() - + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" - + consumer.getUsername() + "]"; - } - } - - public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); - - private final Properties props; - - /** - * Constructor. - * - * @param busTopicParams topic paramters - * - * @throws MalformedURLException must provide a valid URL - */ - public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - - - super(busTopicParams); - - - final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid() - ? busTopicParams.getAdditionalProps().get( - PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) - : null); - - if (busTopicParams.isEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isAftEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isLatitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (busTopicParams.isLongitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + public Iterable<String> fetch() { + ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout)); + if (records == null || records.count() <= 0) { + return Collections.emptyList(); } + List<String> messages = new ArrayList<>(records.count()); + try { + if (allowTracing) { + createParentTraceContext(records); + } - if ((busTopicParams.isPartnerInvalid()) - && StringUtils.isBlank(dme2RouteOffer)) { - throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + for (TopicPartition partition : records.partitions()) { + List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); + for (ConsumerRecord<String, String> partitionRecord : partitionRecords) { + messages.add(partitionRecord.value()); + } + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + } + } catch (Exception e) { + logger.error("{}: cannot fetch, throwing exception after sleep...", this); + sleepAfterFetchFailure(); + throw e; } + return messages; + } - final String serviceName = busTopicParams.getServers().get(0); - - this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - - this.consumer.setUsername(busTopicParams.getUserName()); - this.consumer.setPassword(busTopicParams.getPassword()); - - props = new Properties(); - - props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName); - - props.setProperty("username", busTopicParams.getUserName()); - props.setProperty("password", busTopicParams.getPassword()); - - /* These are required, no defaults */ - props.setProperty("topic", busTopicParams.getTopic()); - - props.setProperty("Environment", busTopicParams.getEnvironment()); - props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); + private void createParentTraceContext(ConsumerRecords<String, String> records) { + TraceParentInfo traceParentInfo = new TraceParentInfo(); + for (ConsumerRecord<String, String> consumerRecord : records) { - if (busTopicParams.getPartner() != null) { - props.setProperty("Partner", busTopicParams.getPartner()); + Headers consumerRecordHeaders = consumerRecord.headers(); + traceParentInfo = processTraceParentHeader(consumerRecordHeaders); } - if (dme2RouteOffer != null) { - props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - - props.setProperty("Latitude", busTopicParams.getLatitude()); - props.setProperty("Longitude", busTopicParams.getLongitude()); - - /* These are optional, will default to these values if not set in additionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - /* These should not change */ - props.setProperty("TransportType", "DME2"); - props.setProperty("MethodType", "GET"); + SpanContext spanContext = SpanContext.createFromRemoteParent( + traceParentInfo.getTraceId(), traceParentInfo.getSpanId(), + TraceFlags.getSampled(), TraceState.builder().build()); - if (busTopicParams.isUseHttps()) { - props.setProperty(PROTOCOL_PROP, "https"); - - } else { - props.setProperty(PROTOCOL_PROP, "http"); - } + Context.current().with(Span.wrap(spanContext)).makeCurrent(); + } - props.setProperty("contenttype", "application/json"); + private TraceParentInfo processTraceParentHeader(Headers headers) { + TraceParentInfo traceParentInfo = new TraceParentInfo(); + if (headers.lastHeader("traceparent") != null) { + traceParentInfo.setParentTraceId(new String(headers.lastHeader( + "traceparent").value(), StandardCharsets.UTF_8)); - if (busTopicParams.isAdditionalPropsValid()) { - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - props.put(entry.getKey(), entry.getValue()); - } + String[] parts = traceParentInfo.getParentTraceId().split("-"); + traceParentInfo.setTraceId(parts[1]); + traceParentInfo.setSpanId(parts[2]); } - MRClientFactory.prop = props; - this.consumer.setProps(props); + return traceParentInfo; + } - logger.info("{}: CREATION", this); + @Data + @NoArgsConstructor + private static class TraceParentInfo { + private String parentTraceId; + private String traceId; + private String spanId; } - private IllegalArgumentException parmException(String topic, String propnm) { - return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + propnm + " property for DME2 in DMaaP"); + @Override + public void close() { + super.close(); + this.consumer.close(); + logger.info("Kafka Consumer exited {}", this); + } + @Override + public String toString() { + return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; } } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 09d52946..1b57e48e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -2,8 +2,10 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. + * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2022-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,28 +23,22 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; -import com.att.nsa.apiClient.http.HttpClient.ConnectionType; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor; import java.util.Properties; -import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; -import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; -import org.onap.dmaap.mr.client.response.MRPublisherResponse; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.onap.policy.common.gson.annotation.GsonJsonIgnore; +import java.util.UUID; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public interface BusPublisher { + String NO_MESSAGE_PROVIDED = "No message provided"; + String LOG_CLOSE = "{}: CLOSE"; + String LOG_CLOSE_FAILED = "{}: CLOSE FAILED"; + /** * sends a message. * @@ -51,72 +47,76 @@ public interface BusPublisher { * @return true if success, false otherwise * @throws IllegalArgumentException if no message provided */ - public boolean send(String partitionId, String message); + boolean send(String partitionId, String message); /** * closes the publisher. */ - public void close(); + void close(); /** - * Cambria based library publisher. + * Kafka based library publisher. */ - public static class CambriaPublisherWrapper implements BusPublisher { + class KafkaPublisherWrapper implements BusPublisher { + + private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class); + private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; - private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class); + private final String topic; /** - * The actual Cambria publisher. + * Kafka publisher. */ - @GsonJsonIgnore - protected CambriaBatchingPublisher publisher; + private final Producer<String, String> producer; + protected Properties kafkaProps; /** - * Constructor. + * Kafka Publisher Wrapper. * * @param busTopicParams topic parameters */ - public CambriaPublisherWrapper(BusTopicParams busTopicParams) { - - PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); - - builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()); + protected KafkaPublisherWrapper(BusTopicParams busTopicParams) { - // Set read timeout to 30 seconds (TBD: this should be configurable) - builder.withSocketTimeout(30000); - - if (busTopicParams.isUseHttps()) { - if (busTopicParams.isAllowSelfSignedCerts()) { - builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION); - } else { - builder.withConnectionType(ConnectionType.HTTPS); - } + if (busTopicParams.isTopicInvalid()) { + throw new IllegalArgumentException("No topic for Kafka"); } + this.topic = busTopicParams.getTopic(); - if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { - builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); + // Setup Properties for consumer + kafkaProps = new Properties(); + kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0)); + if (busTopicParams.isAdditionalPropsValid()) { + kafkaProps.putAll(busTopicParams.getAdditionalProps()); } - - if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { - builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); + if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); + } + if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); } - try { - this.publisher = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); + if (busTopicParams.isAllowTracing()) { + kafkaProps.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingProducerInterceptor.class.getName()); } + + producer = new KafkaProducer<>(kafkaProps); } @Override public boolean send(String partitionId, String message) { if (message == null) { - throw new IllegalArgumentException("No message provided"); + throw new IllegalArgumentException(NO_MESSAGE_PROVIDED); } try { - this.publisher.send(partitionId, message); + // Create the record + ProducerRecord<String, String> producerRecord = + new ProducerRecord<>(topic, UUID.randomUUID().toString(), message); + + this.producer.send(producerRecord); + producer.flush(); } catch (Exception e) { logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e); return false; @@ -126,263 +126,19 @@ public interface BusPublisher { @Override public void close() { - logger.info("{}: CLOSE", this); - - try { - this.publisher.close(); - } catch (Exception e) { - logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); - } - } - - - @Override - public String toString() { - return "CambriaPublisherWrapper []"; - } - - } - - /** - * DmaapClient library wrapper. - */ - public abstract class DmaapPublisherWrapper implements BusPublisher { - - private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class); - - /** - * MR based Publisher. - */ - protected MRSimplerBatchPublisher publisher; - protected Properties props; - - /** - * MR Publisher Wrapper. - * - * @param servers messaging bus hosts - * @param topic topic - * @param username AAF or DME2 Login - * @param password AAF or DME2 Password - */ - public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic, - String username, String password, boolean useHttps) { - - - if (StringUtils.isBlank(topic)) { - throw new IllegalArgumentException("No topic for DMaaP"); - } - - - configureProtocol(topic, protocol, servers, useHttps); - - this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); - - this.publisher.setUsername(username); - this.publisher.setPassword(password); - - props = new Properties(); - - props.setProperty("Protocol", (useHttps ? "https" : "http")); - props.setProperty("contenttype", "application/json"); - props.setProperty("username", username); - props.setProperty("password", password); - - props.setProperty("topic", topic); - - this.publisher.setProps(props); - - if (protocol == ProtocolTypeConstants.AAF_AUTH) { - this.publisher.setHost(servers.get(0)); - } - - logger.info("{}: CREATION: using protocol {}", this, protocol.getValue()); - } - - private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers, - boolean useHttps) { - - if (protocol == ProtocolTypeConstants.AAF_AUTH) { - if (servers == null || servers.isEmpty()) { - throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided"); - } - - ArrayList<String> dmaapServers = new ArrayList<>(); - String port = useHttps ? ":3905" : ":3904"; - for (String server : servers) { - dmaapServers.add(server + port); - } - - - this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); - - this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - } else if (protocol == ProtocolTypeConstants.DME2) { - ArrayList<String> dmaapServers = new ArrayList<>(); - dmaapServers.add("0.0.0.0:3904"); - - this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); - - this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - - } else { - throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol); - } - } - - @Override - public void close() { - logger.info("{}: CLOSE", this); + logger.info(LOG_CLOSE, this); try { - this.publisher.close(1, TimeUnit.SECONDS); + this.producer.close(); } catch (Exception e) { logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); } } @Override - public boolean send(String partitionId, String message) { - if (message == null) { - throw new IllegalArgumentException("No message provided"); - } - - this.publisher.setPubResponse(new MRPublisherResponse()); - this.publisher.send(partitionId, message); - MRPublisherResponse response = this.publisher.sendBatchWithResponse(); - if (response != null) { - logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(), - response.getResponseMessage()); - } - - return true; - } - - @Override public String toString() { - return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate() - + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()=" - + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag() - + ", publisher.getUsername()=" + publisher.getUsername() + "]"; + return "KafkaPublisherWrapper []"; } - } - - /** - * DmaapClient library wrapper. - */ - public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper { - /** - * MR based Publisher. - */ - public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword, - boolean useHttps) { - - super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps); - } - } - - public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { - - /** - * Constructor. - * - * @param busTopicParams topic parameters - */ - public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) { - - super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(), - busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps()); - - String dme2RouteOffer = busTopicParams.isAdditionalPropsValid() - ? busTopicParams.getAdditionalProps().get( - PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) - : null; - validateParams(busTopicParams, dme2RouteOffer); - - String serviceName = busTopicParams.getServers().get(0); - - /* These are required, no defaults */ - props.setProperty("Environment", busTopicParams.getEnvironment()); - props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); - - props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName); - - if (busTopicParams.getPartner() != null) { - props.setProperty("Partner", busTopicParams.getPartner()); - } - if (dme2RouteOffer != null) { - props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - - props.setProperty("Latitude", busTopicParams.getLatitude()); - props.setProperty("Longitude", busTopicParams.getLongitude()); - - // ServiceName also a default, found in additionalProps - - /* These are optional, will default to these values if not set in optionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - - /* These should not change */ - props.setProperty("TransportType", "DME2"); - props.setProperty("MethodType", "POST"); - - if (busTopicParams.isAdditionalPropsValid()) { - addAdditionalProps(busTopicParams); - } - - this.publisher.setProps(props); - } - - private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) { - if (busTopicParams.isEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isAftEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isLatitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (busTopicParams.isLongitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - } - - if ((busTopicParams.isPartnerInvalid()) - && StringUtils.isBlank(dme2RouteOffer)) { - throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); - } - } - - private void addAdditionalProps(BusTopicParams busTopicParams) { - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - - if (value != null) { - props.setProperty(key, value); - } - } - } - - private IllegalArgumentException parmException(String topic, String propnm) { - return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + propnm + " property for DME2 in DMaaP"); - - } } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java index ccf25753..f8236d3d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java @@ -2,14 +2,15 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,11 +21,13 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import lombok.Getter; import org.onap.policy.common.endpoints.event.comm.bus.ApiKeyEnabled; /** * Bus Topic Base. */ +@Getter public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { /** @@ -43,58 +46,37 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { protected boolean useHttps; /** + * Allow tracing. + */ + protected boolean allowTracing; + + /** * allow self signed certificates. */ protected boolean allowSelfSignedCerts; /** * Instantiates a new Bus Topic Base. - * + * * <p>servers list of servers * topic topic name * apiKey API Key * apiSecret API Secret * useHttps does connection use HTTPS? + * allowTracing Is tracing allowed? * allowSelfSignedCerts are self-signed certificates allow * @param busTopicParams holds all our parameters * @throws IllegalArgumentException if invalid parameters are present */ - public BusTopicBase(BusTopicParams busTopicParams) { + protected BusTopicBase(BusTopicParams busTopicParams) { super(busTopicParams.getServers(), busTopicParams.getTopic(), busTopicParams.getEffectiveTopic()); this.apiKey = busTopicParams.getApiKey(); this.apiSecret = busTopicParams.getApiSecret(); this.useHttps = busTopicParams.isUseHttps(); + this.allowTracing = busTopicParams.isAllowTracing(); this.allowSelfSignedCerts = busTopicParams.isAllowSelfSignedCerts(); } - @Override - public String getApiKey() { - return apiKey; - } - - @Override - public String getApiSecret() { - return apiSecret; - } - - /** - * Is using HTTPS. - * - * @return if using https - */ - public boolean isUseHttps() { - return useHttps; - } - - /** - * Is self signed certificates allowed. - * - * @return if self signed certificates are allowed - */ - public boolean isAllowSelfSignedCerts() { - return allowSelfSignedCerts; - } - protected boolean anyNullOrEmpty(String... args) { for (String arg : args) { if (arg == null || arg.isEmpty()) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java index 9df7221f..53a6ab66 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java @@ -3,8 +3,8 @@ * ONAP * ================================================================================ * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved. - * Modifications Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2019, 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,21 +24,23 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.util.List; import java.util.Map; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.Setter; import org.apache.commons.lang3.StringUtils; /** * Member variables of this Params class are as follows. * - * <p>servers DMaaP servers - * topic DMaaP Topic to be monitored - * apiKey DMaaP API Key (optional) - * apiSecret DMaaP API Secret (optional) - * consumerGroup DMaaP Reader Consumer Group - * consumerInstance DMaaP Reader Instance - * fetchTimeout DMaaP fetch timeout - * fetchLimit DMaaP fetch limit + * <p>servers Kafka servers + * topic Kafka Topic to be monitored + * apiKey Kafka API Key (optional) + * apiSecret Kafka API Secret (optional) + * consumerGroup kafka Reader Consumer Group + * consumerInstance Kafka Reader Instance + * fetchTimeout kafka fetch timeout + * fetchLimit Kafka fetch limit * environment DME2 Environment * aftEnvironment DME2 AFT Environment * partner DME2 Partner @@ -46,6 +48,7 @@ import org.apache.commons.lang3.StringUtils; * longitude DME2 Longitude * additionalProps Additional properties to pass to DME2 * useHttps does connection use HTTPS? + * allowTracing is message tracing allowed? * allowSelfSignedCerts are self-signed certificates allow */ @Getter @@ -64,6 +67,7 @@ public class BusTopicParams { private int fetchTimeout; private int fetchLimit; private boolean useHttps; + private boolean allowTracing; private boolean allowSelfSignedCerts; private boolean managed; @@ -78,6 +82,7 @@ public class BusTopicParams { private String clientName; private String hostname; private String basePath; + @Getter private String serializationProvider; public static TopicParamsBuilder builder() { @@ -165,29 +170,43 @@ public class BusTopicParams { return additionalProps != null; } - public String getSerializationProvider() { - return serializationProvider; + public void setEffectiveTopic(String effectiveTopic) { + this.effectiveTopic = topicToLowerCase(effectiveTopic); } + public void setTopic(String topic) { + this.topic = topicToLowerCase(topic); + } + + public String getEffectiveTopic() { + return topicToLowerCase(effectiveTopic); + } + + public String getTopic() { + return topicToLowerCase(topic); + } + + private String topicToLowerCase(String topic) { + return (topic == null || topic.isEmpty()) ? topic : topic.toLowerCase(); + } + + @NoArgsConstructor(access = AccessLevel.PRIVATE) public static class TopicParamsBuilder { final BusTopicParams params = new BusTopicParams(); - private TopicParamsBuilder() { - } - public TopicParamsBuilder servers(List<String> servers) { this.params.servers = servers; return this; } public TopicParamsBuilder topic(String topic) { - this.params.topic = topic; + this.params.setTopic(topic); return this; } public TopicParamsBuilder effectiveTopic(String effectiveTopic) { - this.params.effectiveTopic = effectiveTopic; + this.params.setEffectiveTopic(effectiveTopic); return this; } @@ -226,6 +245,11 @@ public class BusTopicParams { return this; } + public TopicParamsBuilder allowTracing(boolean allowTracing) { + this.params.allowTracing = allowTracing; + return this; + } + public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) { this.params.allowSelfSignedCerts = allowSelfSignedCerts; return this; @@ -309,7 +333,6 @@ public class BusTopicParams { this.params.serializationProvider = serializationProvider; return this; } - } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java index 00bd9e3a..9b724072 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java @@ -2,8 +2,10 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +24,8 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.util.UUID; +import lombok.Getter; +import lombok.Setter; import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink; import org.onap.policy.common.endpoints.utils.NetLoggerUtil; import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; @@ -29,8 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink - * regardless if it is UEB or DMaaP. + * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink. * */ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink { @@ -43,7 +46,9 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi /** * The partition key to publish to. */ - protected String partitionId; + @Getter + @Setter + protected String partitionKey; /** * Message bus publisher. @@ -59,17 +64,18 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi * apiSecret api secret * partitionId partition id * useHttps does connection use HTTPS? + * allowTracing is tracing allowed? * allowSelfSignedCerts are self-signed certificates allow * - * @throws IllegalArgumentException in invalid parameters are passed in + * @throws IllegalArgumentException if invalid parameters are passed in */ - public InlineBusTopicSink(BusTopicParams busTopicParams) { + protected InlineBusTopicSink(BusTopicParams busTopicParams) { super(busTopicParams); if (busTopicParams.isPartitionIdInvalid()) { - this.partitionId = UUID.randomUUID().toString(); + this.partitionKey = UUID.randomUUID().toString(); } else { - this.partitionId = busTopicParams.getPartitionId(); + this.partitionKey = busTopicParams.getPartitionId(); } } @@ -138,7 +144,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi NetLoggerUtil.log(EventType.OUT, this.getTopicCommInfrastructure(), this.topic, message); - publisher.send(this.partitionId, message); + publisher.send(this.partitionKey, message); broadcast(message); } catch (Exception e) { logger.warn("{}: cannot send because of {}", this, e.getMessage(), e); @@ -149,44 +155,13 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi } @Override - public void setPartitionKey(String partitionKey) { - this.partitionId = partitionKey; - } - - @Override - public String getPartitionKey() { - return this.partitionId; - } - - @Override public void shutdown() { this.stop(); } @Override - protected boolean anyNullOrEmpty(String... args) { - for (String arg : args) { - if (arg == null || arg.isEmpty()) { - return true; - } - } - - return false; - } - - @Override - protected boolean allNullOrEmpty(String... args) { - for (String arg : args) { - if (!(arg == null || arg.isEmpty())) { - return false; - } - } - - return true; - } - - @Override public String toString() { - return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]"; + return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher + + "]"; } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java deleted file mode 100644 index a7a692de..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java +++ /dev/null @@ -1,129 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import java.util.Map; -import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This implementation publishes events for the associated DMAAP topic, inline with the calling - * thread. - */ -public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink { - - protected static Logger logger = LoggerFactory.getLogger(InlineDmaapTopicSink.class); - - protected final String userName; - protected final String password; - - protected String environment = null; - protected String aftEnvironment = null; - protected String partner = null; - protected String latitude = null; - protected String longitude = null; - - protected Map<String, String> additionalProps = null; - - /** - * BusTopicParams contains the below mentioned attributes. - * servers DMaaP servers - * topic DMaaP Topic to be monitored - * apiKey DMaaP API Key (optional) - * apiSecret DMaaP API Secret (optional) - * environment DME2 Environment - * aftEnvironment DME2 AFT Environment - * partner DME2 Partner - * latitude DME2 Latitude - * longitude DME2 Longitude - * additionalProps Additional properties to pass to DME2 - * useHttps does connection use HTTPS? - * allowSelfSignedCerts are self-signed certificates allow - * @param busTopicParams Contains the above mentioned parameters - * @throws IllegalArgumentException An invalid parameter passed in - */ - public InlineDmaapTopicSink(BusTopicParams busTopicParams) { - - super(busTopicParams); - - this.userName = busTopicParams.getUserName(); - this.password = busTopicParams.getPassword(); - - this.environment = busTopicParams.getEnvironment(); - this.aftEnvironment = busTopicParams.getAftEnvironment(); - this.partner = busTopicParams.getPartner(); - - this.latitude = busTopicParams.getLatitude(); - this.longitude = busTopicParams.getLongitude(); - - this.additionalProps = busTopicParams.getAdditionalProps(); - } - - - @Override - public void init() { - if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder() - .servers(this.servers) - .topic(this.effectiveTopic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) - .userName(this.userName) - .password(this.password) - .useHttps(this.useHttps) - .allowSelfSignedCerts(this.allowSelfSignedCerts) - .build()); - } else { - this.publisher = new BusPublisher.DmaapDmePublisherWrapper(BusTopicParams.builder() - .servers(this.servers) - .topic(this.effectiveTopic) - .userName(this.userName) - .password(this.password) - .environment(this.environment) - .aftEnvironment(this.aftEnvironment) - .partner(this.partner) - .latitude(this.latitude) - .longitude(this.longitude) - .additionalProps(this.additionalProps) - .useHttps(this.useHttps) - .build()); - } - - logger.info("{}: DMAAP SINK created", this); - } - - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.DMAAP; - } - - - @Override - public String toString() { - return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password - + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + super.toString() - + "]"; - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java index f258d5d9..6354f762 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java @@ -1,9 +1,6 @@ /* * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,38 +18,39 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import java.util.Map; import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This implementation publishes events for the associated UEB topic, inline with the calling + * This implementation publishes events for the associated KAFKA topic, inline with the calling * thread. */ -public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink { +public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTopicSink { /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class); + private static final Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class); + + protected Map<String, String> additionalProps; /** - * Argument-based UEB Topic Writer instantiation. BusTopicParams contains below mentioned + * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains the below * attributes. * - * <p>servers list of UEB servers available for publishing + * <p>servers list of KAFKA servers available for publishing * topic the topic to publish to - * apiKey the api key (optional) - * apiSecret the api secret (optional) * partitionId the partition key (optional, autogenerated if not provided) * useHttps does connection use HTTPS? - * allowSelfSignedCerts are self-signed certificates allow * @param busTopicParams contains attributes needed * @throws IllegalArgumentException if invalid arguments are detected */ - public InlineUebTopicSink(BusTopicParams busTopicParams) { + public InlineKafkaTopicSink(BusTopicParams busTopicParams) { super(busTopicParams); + this.additionalProps = busTopicParams.getAdditionalProps(); } /** @@ -61,27 +59,24 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi @Override public void init() { - this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder() + this.publisher = new BusPublisher.KafkaPublisherWrapper(BusTopicParams.builder() .servers(this.servers) .topic(this.effectiveTopic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) .useHttps(this.useHttps) - .allowSelfSignedCerts(this.allowSelfSignedCerts) + .allowTracing(this.allowTracing) + .additionalProps(this.additionalProps) .build()); - logger.info("{}: UEB SINK created", this); + logger.info("{}: KAFKA SINK created", this); } @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("InlineUebTopicSink [getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()) - .append(", toString()=").append(super.toString()).append("]"); - return builder.toString(); + return "InlineKafkaTopicSink [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + + super.toString() + "]"; } @Override public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.UEB; + return Topic.CommInfrastructure.KAFKA; } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index e52204f6..f98b481f 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -2,8 +2,9 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,10 +25,9 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.io.IOException; import java.net.MalformedURLException; import java.util.UUID; -import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource; +import lombok.Getter; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.onap.policy.common.endpoints.utils.NetLoggerUtil; import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; @@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory; * notifying its listeners. */ public abstract class SingleThreadedBusTopicSource extends BusTopicBase - implements Runnable, BusTopicSource, FilterableTopicSource { + implements Runnable, BusTopicSource { /** * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only @@ -51,21 +51,25 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase /** * Bus consumer group. */ + @Getter protected final String consumerGroup; /** * Bus consumer instance. */ + @Getter protected final String consumerInstance; /** * Bus fetch timeout. */ + @Getter protected final int fetchTimeout; /** * Bus fetch limit. */ + @Getter protected final int fetchLimit; /** @@ -86,19 +90,24 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase * * @throws IllegalArgumentException An invalid parameter passed in */ - public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) { + protected SingleThreadedBusTopicSource(BusTopicParams busTopicParams) { super(busTopicParams); - if (busTopicParams.isConsumerGroupInvalid()) { + if (busTopicParams.isConsumerGroupInvalid() && busTopicParams.isConsumerInstanceInvalid()) { this.consumerGroup = UUID.randomUUID().toString(); - } else { + this.consumerInstance = NetworkUtil.getHostname(); + + } else if (busTopicParams.isConsumerGroupInvalid()) { + this.consumerGroup = UUID.randomUUID().toString(); + this.consumerInstance = busTopicParams.getConsumerInstance(); + + } else if (busTopicParams.isConsumerInstanceInvalid()) { this.consumerGroup = busTopicParams.getConsumerGroup(); - } + this.consumerInstance = UUID.randomUUID().toString(); - if (busTopicParams.isConsumerInstanceInvalid()) { - this.consumerInstance = NetworkUtil.getHostname(); } else { + this.consumerGroup = busTopicParams.getConsumerGroup(); this.consumerInstance = busTopicParams.getConsumerInstance(); } @@ -262,17 +271,6 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase return broadcast(event); } - - @Override - public void setFilter(String filter) { - if (consumer instanceof FilterableBusConsumer) { - ((FilterableBusConsumer) consumer).setFilter(filter); - - } else { - throw new UnsupportedOperationException("no server-side filtering for topic " + topic); - } - } - @Override public String toString() { return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance @@ -282,29 +280,8 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase } @Override - public String getConsumerGroup() { - return consumerGroup; - } - - @Override - public String getConsumerInstance() { - return consumerInstance; - } - - @Override public void shutdown() { this.stop(); this.topicListeners.clear(); } - - @Override - public int getFetchTimeout() { - return fetchTimeout; - } - - @Override - public int getFetchLimit() { - return fetchLimit; - } - } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java deleted file mode 100644 index c5f2b5bc..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ /dev/null @@ -1,140 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import java.net.MalformedURLException; -import java.util.Map; -import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This topic reader implementation specializes in reading messages over DMAAP topic and notifying - * its listeners. - */ -public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable { - - private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class); - - - protected final String userName; - protected final String password; - - protected String environment = null; - protected String aftEnvironment = null; - protected String partner = null; - protected String latitude = null; - protected String longitude = null; - - protected Map<String, String> additionalProps = null; - - - /** - * Constructor. - * - * @param busTopicParams Parameters object containing all the required inputs - * - * @throws IllegalArgumentException An invalid parameter passed in - */ - public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) { - - super(busTopicParams); - - this.userName = busTopicParams.getUserName(); - this.password = busTopicParams.getPassword(); - - this.environment = busTopicParams.getEnvironment(); - this.aftEnvironment = busTopicParams.getAftEnvironment(); - this.partner = busTopicParams.getPartner(); - - this.latitude = busTopicParams.getLatitude(); - this.longitude = busTopicParams.getLongitude(); - - this.additionalProps = busTopicParams.getAdditionalProps(); - try { - this.init(); - } catch (Exception e) { - throw new IllegalArgumentException("ERROR during init in dmaap-source: cannot create topic " + topic, e); - } - } - - - /** - * Initialize the Cambria or MR Client. - */ - @Override - public void init() throws MalformedURLException { - BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder() - .servers(this.servers) - .topic(this.effectiveTopic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) - .consumerGroup(this.consumerGroup) - .consumerInstance(this.consumerInstance) - .fetchTimeout(this.fetchTimeout) - .fetchLimit(this.fetchLimit) - .useHttps(this.useHttps); - - if (anyNullOrEmpty(this.userName, this.password)) { - this.consumer = new BusConsumer.CambriaConsumerWrapper(builder - .allowSelfSignedCerts(this.allowSelfSignedCerts) - .build()); - } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.consumer = new BusConsumer.CambriaConsumerWrapper(builder - .userName(this.userName) - .password(this.password) - .allowSelfSignedCerts(this.allowSelfSignedCerts) - .build()); - } else { - this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(builder - .userName(this.userName) - .password(this.password) - .environment(this.environment) - .aftEnvironment(this.aftEnvironment) - .partner(this.partner) - .latitude(this.latitude) - .longitude(this.longitude) - .additionalProps(this.additionalProps) - .build()); - } - - logger.info("{}: INITTED", this); - } - - @Override - public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.DMAAP; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=") - .append((password == null || password.isEmpty()) ? "-" : password.length()) - .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=") - .append(super.toString()).append("]"); - return builder.toString(); - } - - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java index e210762d..869273f0 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java @@ -1,9 +1,6 @@ /* * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,14 +18,18 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import java.net.MalformedURLException; +import java.util.Map; import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; /** - * This topic source implementation specializes in reading messages over an UEB Bus topic source and + * This topic source implementation specializes in reading messages over a Kafka Bus topic source and * notifying its listeners. */ -public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource { +public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource { + + protected Map<String, String> additionalProps = null; /** * Constructor. @@ -36,40 +37,43 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource i * @param busTopicParams Parameters object containing all the required inputs * @throws IllegalArgumentException An invalid parameter passed in */ - public SingleThreadedUebTopicSource(BusTopicParams busTopicParams) { + public SingleThreadedKafkaTopicSource(BusTopicParams busTopicParams) { super(busTopicParams); - this.init(); + this.additionalProps = busTopicParams.getAdditionalProps(); + try { + this.init(); + } catch (Exception e) { + throw new IllegalArgumentException("ERROR during init in kafka-source: cannot create topic " + topic, e); + } } /** * Initialize the Cambria client. */ @Override - public void init() { - this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder() + public void init() throws MalformedURLException { + BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder() .servers(this.servers) .topic(this.effectiveTopic) - .apiKey(this.apiKey) - .apiSecret(this.apiSecret) - .consumerGroup(this.consumerGroup) - .consumerInstance(this.consumerInstance) .fetchTimeout(this.fetchTimeout) - .fetchLimit(this.fetchLimit) + .consumerGroup(this.consumerGroup) .useHttps(this.useHttps) - .allowSelfSignedCerts(this.allowSelfSignedCerts).build()); + .allowTracing(this.allowTracing); + + this.consumer = new BusConsumer.KafkaConsumerWrapper(builder + .additionalProps(this.additionalProps) + .build()); } @Override public CommInfrastructure getTopicCommInfrastructure() { - return Topic.CommInfrastructure.UEB; + return Topic.CommInfrastructure.KAFKA; } @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("SingleThreadedUebTopicSource [getTopicCommInfrastructure()=") - .append(getTopicCommInfrastructure()).append(", toString()=").append(super.toString()).append("]"); - return builder.toString(); + return "SingleThreadedKafkaTopicSource [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + + ", toString()=" + super.toString() + "]"; } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java index 2e0a9a46..c63fbcc2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java @@ -2,7 +2,9 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,18 +24,21 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.util.ArrayList; import java.util.List; +import lombok.AccessLevel; +import lombok.Getter; import org.apache.commons.collections4.queue.CircularFifoQueue; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Getter public abstract class TopicBase implements Topic { /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(TopicBase.class); + private static final Logger logger = LoggerFactory.getLogger(TopicBase.class); /** * List of servers. @@ -71,6 +76,7 @@ public abstract class TopicBase implements Topic { /** * All my subscribers for new message notifications. */ + @Getter(AccessLevel.NONE) protected final ArrayList<TopicListener> topicListeners = new ArrayList<>(); /** @@ -81,7 +87,7 @@ public abstract class TopicBase implements Topic { * * @throws IllegalArgumentException if invalid parameters are present */ - public TopicBase(List<String> servers, String topic) { + protected TopicBase(List<String> servers, String topic) { this(servers, topic, topic); } @@ -93,7 +99,7 @@ public abstract class TopicBase implements Topic { * * @throws IllegalArgumentException if invalid parameters are present */ - public TopicBase(List<String> servers, String topic, String effectiveTopic) { + protected TopicBase(List<String> servers, String topic, String effectiveTopic) { if (servers == null || servers.isEmpty()) { throw new IllegalArgumentException("Server(s) must be provided"); @@ -111,8 +117,8 @@ public abstract class TopicBase implements Topic { } this.servers = servers; - this.topic = topic; - this.effectiveTopic = effectiveTopicCopy; + this.topic = topic.toLowerCase(); + this.effectiveTopic = effectiveTopicCopy.toLowerCase(); } @Override @@ -158,7 +164,7 @@ public abstract class TopicBase implements Topic { protected boolean broadcast(String message) { List<TopicListener> snapshotListeners = this.snapshotTopicListeners(); - boolean success = true; + var success = true; for (TopicListener topicListener : snapshotListeners) { try { topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message); @@ -218,33 +224,8 @@ public abstract class TopicBase implements Topic { } @Override - public boolean isLocked() { - return this.locked; - } - - @Override - public String getTopic() { - return topic; - } - - @Override - public String getEffectiveTopic() { - return effectiveTopic; - } - - @Override - public boolean isAlive() { - return this.alive; - } - - @Override - public List<String> getServers() { - return servers; - } - - @Override public synchronized String[] getRecentEvents() { - String[] events = new String[recentEvents.size()]; + var events = new String[recentEvents.size()]; return recentEvents.toArray(events); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java index 57a331c1..4f601fa8 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java @@ -2,7 +2,8 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +21,12 @@ package org.onap.policy.common.endpoints.event.comm.client; -import java.util.Arrays; +import jakarta.validation.constraints.NotNull; +import java.util.Collections; import java.util.List; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; import lombok.Getter; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; @@ -29,6 +34,11 @@ import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A "bidirectional" topic, which is a pair of topics, one of which is used to publish @@ -36,6 +46,9 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource; */ @Getter public class BidirectionalTopicClient { + private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicClient.class); + private static final Coder coder = new StandardCoder(); + private final String sinkTopic; private final String sourceTopic; private final TopicSink sink; @@ -44,6 +57,16 @@ public class BidirectionalTopicClient { private final CommInfrastructure sourceTopicCommInfrastructure; /** + * Used when checking whether a message sent on the sink topic can be received + * on the source topic. When a matching message is received on the incoming topic, + * {@code true} is placed on the queue. If {@link #stopWaiting()} is called or the waiting + * thread is interrupted, then {@code false} is placed on the queue. Whenever a value + * is pulled from the queue, it is immediately placed back on the queue. + */ + private final BlockingDeque<Boolean> checkerQueue = new LinkedBlockingDeque<>(); + + + /** * Constructs the object. * * @param sinkTopic sink topic name @@ -51,8 +74,8 @@ public class BidirectionalTopicClient { * @throws BidirectionalTopicClientException if either topic does not exist */ public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException { - this.sinkTopic = sinkTopic; - this.sourceTopic = sourceTopic; + this.sinkTopic = sinkTopic.toLowerCase(); + this.sourceTopic = sourceTopic.toLowerCase(); // init sinkClient List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic); @@ -65,7 +88,7 @@ public class BidirectionalTopicClient { this.sink = sinks.get(0); // init source - List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic)); + List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Collections.singletonList(sourceTopic)); if (sources.isEmpty()) { throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic); } else if (sources.size() > 1) { @@ -94,9 +117,108 @@ public class BidirectionalTopicClient { source.unregister(topicListener); } + /** + * Determines whether the topic is ready (i.e., {@link #awaitReady(Object, long)} has + * previously returned {@code true}). + * + * @return {@code true}, if the topic is ready to send and receive + */ + public boolean isReady() { + return Boolean.TRUE.equals(checkerQueue.peek()); + } + + /** + * Waits for the bidirectional topic to become "ready" by publishing a message on the + * sink topic and awaiting receipt of the message on the source topic. If the message + * is not received within a few seconds, then it tries again. This process is + * continued until the message is received, {@link #stopWaiting()} is called, or this thread + * is interrupted. Once this returns, subsequent calls will return immediately, always + * with the same value. + * + * @param message message to be sent to the sink topic. Note: the equals() method must + * return {@code true} if and only if two messages are the same + * @param waitMs time to wait, in milliseconds, before re-sending the message + * @return {@code true} if the message was received from the source topic, + * {@code false} if this method was stopped or interrupted before receipt of + * the message + * @throws CoderException if the message cannot be encoded + */ + public synchronized <T> boolean awaitReady(T message, long waitMs) throws CoderException { + // see if we already know the answer + if (!checkerQueue.isEmpty()) { + return checkerQueue.peek(); + } + + final String messageText = coder.encode(message); + + // class of message to be decoded + final TopicListener listener = getTopicListener(message); + + source.register(listener); + + // loop until the message is received + try { + Boolean result; + do { + send(messageText); + } while ((result = checkerQueue.poll(waitMs, TimeUnit.MILLISECONDS)) == null); + + // put it back on the queue + checkerQueue.add(result); + + } catch (InterruptedException e) { + logger.error("interrupted waiting for topic sink {} source {}", sink.getTopic(), source.getTopic(), e); + Thread.currentThread().interrupt(); + checkerQueue.add(Boolean.FALSE); + + } finally { + source.unregister(listener); + } + + return checkerQueue.peek(); + } + + @NotNull + private <T> TopicListener getTopicListener(T message) { + @SuppressWarnings("unchecked") + final Class<? extends T> clazz = (Class<? extends T>) message.getClass(); + + // create a listener to detect when a matching message is received + return (infra, topic, msg) -> { + try { + T incoming = decode(msg, clazz); + + if (message.equals(incoming)) { + logger.info("topic {} is ready; found matching message {}", topic, incoming); + checkerQueue.add(Boolean.TRUE); + } + + } catch (CoderException e) { + logger.warn("cannot decode message from topic {}", topic, e); + decodeFailed(); + } + }; + } + + /** + * Stops any listeners that are currently stuck in {@link #awaitReady(Object)} by + * adding {@code false} to the queue. + */ + public void stopWaiting() { + checkerQueue.add(Boolean.FALSE); + } + // these may be overridden by junit tests protected TopicEndpoint getTopicEndpointManager() { return TopicEndpointManager.getManager(); } + + protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException { + return coder.decode(msg, clazz); + } + + protected void decodeFailed() { + // already logged - nothing else to do + } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java index 3a5d727b..1037d3af 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,10 +21,13 @@ package org.onap.policy.common.endpoints.event.comm.client; +import java.io.Serial; + /** * Exception thrown by BidirectionalTopicClient class. */ public class BidirectionalTopicClientException extends Exception { + @Serial private static final long serialVersionUID = 1L; public BidirectionalTopicClientException() { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java index 9f8b3c06..0ccc8a75 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java @@ -3,7 +3,7 @@ * ONAP PAP * ================================================================================ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019, 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; /** * Client for sending messages to a Topic using TopicSink. */ +@Getter public class TopicSinkClient { private static final Logger logger = LoggerFactory.getLogger(TopicSinkClient.class); @@ -46,7 +47,6 @@ public class TopicSinkClient { /** * Where messages are published. */ - @Getter private final TopicSink sink; /** @@ -56,9 +56,9 @@ public class TopicSinkClient { * @throws TopicSinkClientException if the topic does not exist */ public TopicSinkClient(final String topic) throws TopicSinkClientException { - final List<TopicSink> lst = getTopicSinks(topic); + final List<TopicSink> lst = getTopicSinks(topic.toLowerCase()); if (lst.isEmpty()) { - throw new TopicSinkClientException("no sinks for topic: " + topic); + throw new TopicSinkClientException("no sinks for topic: " + topic.toLowerCase()); } this.sink = lst.get(0); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java index 608393b3..431d4f34 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java @@ -3,7 +3,7 @@ * ONAP PAP * ================================================================================ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019, 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,10 +21,13 @@ package org.onap.policy.common.endpoints.event.comm.client; +import java.io.Serial; + /** * Exception thrown by TopicSink client classes. */ public class TopicSinkClientException extends Exception { + @Serial private static final long serialVersionUID = 1L; public TopicSinkClientException() { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/features/NetLoggerFeatureProviders.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/features/NetLoggerFeatureProviders.java index 8b09f386..ba84b551 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/features/NetLoggerFeatureProviders.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/features/NetLoggerFeatureProviders.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,12 +20,15 @@ package org.onap.policy.common.endpoints.features; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NoArgsConstructor; import org.onap.policy.common.utils.services.OrderedServiceImpl; /** * Providers for network logging feature. */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class NetLoggerFeatureProviders { /** @@ -34,8 +37,4 @@ public class NetLoggerFeatureProviders { @Getter private static final OrderedServiceImpl<NetLoggerFeatureApi> providers = new OrderedServiceImpl<>(NetLoggerFeatureApi.class); - - private NetLoggerFeatureProviders() { - // do nothing - } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java index 8a133155..01deaa2d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClient.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,12 +21,12 @@ package org.onap.policy.common.endpoints.http.client; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.client.InvocationCallback; +import jakarta.ws.rs.client.WebTarget; +import jakarta.ws.rs.core.Response; import java.util.Map; import java.util.concurrent.Future; -import javax.ws.rs.client.Entity; -import javax.ws.rs.client.InvocationCallback; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.Response; import org.onap.policy.common.capabilities.Startable; /** @@ -193,9 +194,9 @@ public interface HttpClient extends Startable { String getBasePath(); /** - * Get the user name. + * Get the username. * - * @return the user name + * @return the username */ String getUserName(); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientConfigException.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientConfigException.java index 98ec576f..bb871fa0 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientConfigException.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientConfigException.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,10 +21,13 @@ package org.onap.policy.common.endpoints.http.client; +import java.io.Serial; + /** * Exception generated by HttpClient builder. */ public class HttpClientConfigException extends Exception { + @Serial private static final long serialVersionUID = 1L; public HttpClientConfigException() { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientFactoryInstance.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientFactoryInstance.java index c2921640..f64be30e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientFactoryInstance.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientFactoryInstance.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,11 @@ package org.onap.policy.common.endpoints.http.client; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NoArgsConstructor; +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class HttpClientFactoryInstance { /** @@ -29,9 +32,4 @@ public class HttpClientFactoryInstance { */ @Getter private static final HttpClientFactory clientFactory = new IndexedHttpClientFactory(); - - - private HttpClientFactoryInstance() { - // do nothing - } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java index edf8ff6f..5f0b1d6e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java @@ -2,7 +2,8 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +21,7 @@ package org.onap.policy.common.endpoints.http.client; +import com.google.re2j.Pattern; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -38,11 +40,12 @@ import org.slf4j.LoggerFactory; * HTTP client factory implementation indexed by name. */ class IndexedHttpClientFactory implements HttpClientFactory { + private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(IndexedHttpClientFactory.class); + private static final Logger logger = LoggerFactory.getLogger(IndexedHttpClientFactory.class); protected HashMap<String, HttpClient> clients = new HashMap<>(); @@ -75,7 +78,7 @@ class IndexedHttpClientFactory implements HttpClientFactory { return clientList; } - for (String clientName : clientNames.split("\\s*,\\s*")) { + for (String clientName : COMMA_SPACE_PAT.split(clientNames)) { addClient(clientList, clientName, properties); } @@ -85,23 +88,22 @@ class IndexedHttpClientFactory implements HttpClientFactory { private void addClient(ArrayList<HttpClient> clientList, String clientName, Properties properties) { String clientPrefix = PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + clientName; - PropertyUtils props = new PropertyUtils(properties, clientPrefix, + var props = new PropertyUtils(properties, clientPrefix, (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for http client {} ", this, name, value, clientName)); - int port = props.getInteger(PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX, -1); + var port = props.getInteger(PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX, -1); if (port < 0) { logger.warn("No HTTP port for client in {}", clientName); return; } - boolean https = props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false); - try { HttpClient client = this.build(BusTopicParams.builder() .clientName(clientName) - .useHttps(https) - .allowSelfSignedCerts(https) + .useHttps(props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false)) + .allowSelfSignedCerts( + props.getBoolean(PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX, false)) .hostname(props.getString(PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, null)) .port(port) .basePath(props.getString(PolicyEndPointProperties.PROPERTY_HTTP_URL_SUFFIX, null)) diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java index f4f0705c..130b6c15 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java @@ -2,9 +2,9 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019, 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,14 @@ package org.onap.policy.common.endpoints.http.client.internal; +import com.google.re2j.Pattern; +import jakarta.ws.rs.client.Client; +import jakarta.ws.rs.client.ClientBuilder; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.client.Invocation.Builder; +import jakarta.ws.rs.client.InvocationCallback; +import jakarta.ws.rs.client.WebTarget; +import jakarta.ws.rs.core.Response; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; @@ -30,19 +38,13 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.Future; import javax.net.ssl.SSLContext; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Entity; -import javax.ws.rs.client.Invocation.Builder; -import javax.ws.rs.client.InvocationCallback; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.Response; +import lombok.Getter; +import lombok.ToString; import org.apache.commons.lang3.StringUtils; import org.glassfish.jersey.client.ClientProperties; import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.http.client.HttpClient; -import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.onap.policy.common.utils.network.NetworkUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,12 +52,15 @@ import org.slf4j.LoggerFactory; /** * Http Client implementation using a Jersey Client. */ +@Getter +@ToString public class JerseyClient implements HttpClient { + private static final Pattern COMMA_PAT = Pattern.compile(","); /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(JerseyClient.class); + private static final Logger logger = LoggerFactory.getLogger(JerseyClient.class); protected static final String JERSEY_DEFAULT_SERIALIZATION_PROVIDER = "org.onap.policy.common.gson.GsonMessageBodyHandler"; @@ -77,9 +82,14 @@ public class JerseyClient implements HttpClient { /** * Constructor. * - * <p>name the name https is it https or not selfSignedCerts are there self signed certs - * hostname the hostname port port being used basePath base context userName user - * password password + * <p>name - the name + * https - is it https or not + * selfSignedCerts - are there self-signed certs + * hostname - the hostname + * port - port being used + * basePath - base context + * userName - user credentials + * password - password credentials * * @param busTopicParams Input parameters object * @throws KeyManagementException key exception @@ -112,7 +122,7 @@ public class JerseyClient implements HttpClient { this.client = detmClient(); if (!StringUtils.isBlank(this.userName) && !StringUtils.isBlank(this.password)) { - HttpAuthenticationFeature authFeature = HttpAuthenticationFeature.basic(userName, password); + var authFeature = HttpAuthenticationFeature.basic(userName, password); this.client.register(authFeature); } @@ -127,13 +137,13 @@ public class JerseyClient implements HttpClient { private Client detmClient() throws NoSuchAlgorithmException, KeyManagementException { if (this.https) { ClientBuilder clientBuilder; - SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); + var sslContext = SSLContext.getInstance("TLSv1.2"); if (this.selfSignedCerts) { sslContext.init(null, NetworkUtil.getAlwaysTrustingManager(), new SecureRandom()); - // This falls under self signed certs which is used for non-production testing environments where + // This falls under self-signed certs which is used for non-production testing environments where // the hostname in the cert is unlikely to be crafted properly. We always return true for the - // hostname verifier. This causes a sonar vuln but we ignore it as it could cause problems in some + // hostname verifier. This causes a sonar vuln, but we ignore it as it could cause problems in some // testing environments. clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext).hostnameVerifier( @@ -158,7 +168,7 @@ public class JerseyClient implements HttpClient { private void registerSerProviders(String serializationProvider) throws ClassNotFoundException { String providers = (StringUtils.isBlank(serializationProvider) ? JERSEY_DEFAULT_SERIALIZATION_PROVIDER : serializationProvider); - for (String prov : providers.split(",")) { + for (String prov : COMMA_PAT.split(providers)) { this.client.register(Class.forName(prov)); } } @@ -195,7 +205,7 @@ public class JerseyClient implements HttpClient { @Override public Future<Response> get(InvocationCallback<Response> callback, Map<String, Object> headers) { - Builder builder = getWebTarget().request(); + var builder = getWebTarget().request(); if (headers != null) { headers.forEach(builder::header); } @@ -262,83 +272,8 @@ public class JerseyClient implements HttpClient { return this.alive; } - @Override - public String getName() { - return name; - } - - @Override - public boolean isHttps() { - return https; - } - - @Override - public boolean isSelfSignedCerts() { - return selfSignedCerts; - } - - @Override - public String getHostname() { - return hostname; - } - - @Override - public int getPort() { - return port; - } - - @Override - public String getBasePath() { - return basePath; - } - - @Override - public String getUserName() { - return userName; - } - - @GsonJsonIgnore - @Override - public String getPassword() { - return password; - } - - @Override - public String getBaseUrl() { - return baseUrl; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("JerseyClient [name="); - builder.append(name); - builder.append(", https="); - builder.append(https); - builder.append(", selfSignedCerts="); - builder.append(selfSignedCerts); - builder.append(", hostname="); - builder.append(hostname); - builder.append(", port="); - builder.append(port); - builder.append(", basePath="); - builder.append(basePath); - builder.append(", userName="); - builder.append(userName); - builder.append(", password="); - builder.append(password); - builder.append(", client="); - builder.append(client); - builder.append(", baseUrl="); - builder.append(baseUrl); - builder.append(", alive="); - builder.append(alive); - builder.append("]"); - return builder.toString(); - } - private Builder getBuilder(String path, Map<String, Object> headers) { - Builder builder = getWebTarget().path(path).request(); + var builder = getWebTarget().path(path).request(); for (Entry<String, Object> header : headers.entrySet()) { builder.header(header.getKey(), header.getValue()); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/AuthorizationFilter.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/AuthorizationFilter.java index d884b869..44204cfd 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/AuthorizationFilter.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/AuthorizationFilter.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,36 +21,33 @@ package org.onap.policy.common.endpoints.http.server; +import jakarta.servlet.Filter; +import jakarta.servlet.FilterChain; +import jakarta.servlet.ServletException; +import jakarta.servlet.ServletRequest; +import jakarta.servlet.ServletResponse; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; import java.io.IOException; -import javax.servlet.Filter; -import javax.servlet.FilterChain; -import javax.servlet.ServletException; -import javax.servlet.ServletRequest; -import javax.servlet.ServletResponse; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class AuthorizationFilter implements Filter { - private static Logger logger = LoggerFactory.getLogger(AuthorizationFilter.class); + private static final Logger logger = LoggerFactory.getLogger(AuthorizationFilter.class); @Override public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { - if (!(servletRequest instanceof HttpServletRequest)) { + if (!(servletRequest instanceof HttpServletRequest request)) { throw new ServletException("Not an HttpServletRequest instance"); } - if (!(servletResponse instanceof HttpServletResponse)) { + if (!(servletResponse instanceof HttpServletResponse response)) { throw new ServletException("Not an HttpServletResponse instance"); } - HttpServletRequest request = (HttpServletRequest) servletRequest; - HttpServletResponse response = (HttpServletResponse) servletResponse; - String role = getRole(request); boolean authorized = request.isUserInRole(role); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServer.java index 49dfea06..a20c125d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServer.java @@ -3,7 +3,8 @@ * ONAP * ================================================================================ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2020 Nordix Foundation. + * Modifications Copyright (C) 2020, 2024 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,18 +52,6 @@ public interface HttpServletServer extends Startable { void setBasicAuthentication(String user, String password, String relativeUriPath); /** - * Enables AAF based authentication. - * - * @param filterPath filter path - */ - void setAafAuthentication(String filterPath); - - /** - * Checks if AAF authentication has been enabled. - */ - boolean isAaf(); - - /** * Sets the serialization provider to be used when classes are added to the service. * * @param provider the provider to use for message serialization and de-serialization @@ -90,6 +79,14 @@ public interface HttpServletServer extends Startable { void addServletClass(String servletPath, String restClass); /** + * Adds a Java Servlet. + * + * @param servletPath servlet path + * @param plainServletClass servlet class + */ + void addStdServletClass(String servletPath, String plainServletClass); + + /** * Adds a package containing JAX-RS classes to serve REST requests. * * @param servletPath servlet path @@ -123,4 +120,14 @@ public interface HttpServletServer extends Startable { * @throws InterruptedException if the blocking operation is interrupted */ boolean waitedStart(long maxWaitTime) throws InterruptedException; + + /** + * Are prometheus metrics enabled?. + */ + public boolean isPrometheus(); + + /** + * Enable prometheus metrics. + */ + public void setPrometheus(String metricsPath); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java index 90c0db2e..7ce0becd 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java @@ -3,7 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2020 Nordix Foundation. + * Modifications Copyright (C) 2020,2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,23 +30,24 @@ import java.util.Properties; public interface HttpServletServerFactory { /** - * Builds an http or https rest server with support for servlets. + * Builds a http or https rest server with support for servlets. * * @param name name * @param https use secured http over tls connection * @param host binding host * @param port port + * @param sniHostCheck SNI Host checking flag * @param contextPath server base path * @param swagger enable swagger documentation * @param managed is it managed by infrastructure * @return http server * @throws IllegalArgumentException when invalid parameters are provided */ - HttpServletServer build(String name, boolean https, String host, int port, String contextPath, boolean swagger, - boolean managed); + HttpServletServer build(String name, boolean https, String host, int port, boolean sniHostCheck, String contextPath, + boolean swagger, boolean managed); /** - * Builds an http rest server with support for servlets. + * Builds a http rest server with support for servlets. * * @param name name * @param host binding host @@ -69,19 +70,20 @@ public interface HttpServletServerFactory { List<HttpServletServer> build(Properties properties); /** - * Builds an http or https server to manage static resources. + * Builds a http or https server to manage static resources. * * @param name name * @param https use secured http over tls connection * @param host binding host * @param port port + * @param sniHostCheck SNI Host checking flag * @param contextPath server base path * @param managed is it managed by infrastructure * @return http server * @throws IllegalArgumentException when invalid parameters are provided */ - HttpServletServer buildStaticResourceServer(String name, boolean https, String host, int port, String contextPath, - boolean managed); + HttpServletServer buildStaticResourceServer(String name, boolean https, String host, int port, boolean sniHostCheck, + String contextPath, boolean managed); /** * Gets a server based on the port. diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactoryInstance.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactoryInstance.java index a56be701..58e27cbf 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactoryInstance.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactoryInstance.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,11 @@ package org.onap.policy.common.endpoints.http.server; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NoArgsConstructor; +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class HttpServletServerFactoryInstance { /** @@ -29,10 +32,4 @@ public class HttpServletServerFactoryInstance { */ @Getter private static final HttpServletServerFactory serverFactory = new IndexedHttpServletServerFactory(); - - - private HttpServletServerFactoryInstance() { - // do nothing - } - } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java index 445a3515..7c9aca4c 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java @@ -2,8 +2,9 @@ * ============LICENSE_START======================================================= * ONAP Policy Engine - Common Modules * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2020 Nordix Foundation. + * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020,2023-2024 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +22,7 @@ package org.onap.policy.common.endpoints.http.server; +import com.google.re2j.Pattern; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -37,8 +39,7 @@ import org.slf4j.LoggerFactory; * Indexed factory implementation. */ class IndexedHttpServletServerFactory implements HttpServletServerFactory { - - private static final String SPACES_COMMA_SPACES = "\\s*,\\s*"; + private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); /** * logger. @@ -51,14 +52,14 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory { protected HashMap<Integer, HttpServletServer> servers = new HashMap<>(); @Override - public synchronized HttpServletServer build(String name, boolean https, String host, int port, String contextPath, - boolean swagger, boolean managed) { + public synchronized HttpServletServer build(String name, boolean https, String host, int port, boolean sniHostCheck, + String contextPath, boolean swagger, boolean managed) { if (servers.containsKey(port)) { return servers.get(port); } - JettyJerseyServer server = new JettyJerseyServer(name, https, host, port, contextPath, swagger); + var server = new JettyJerseyServer(name, https, host, port, sniHostCheck, contextPath, swagger); if (managed) { servers.put(port, server); } @@ -69,7 +70,7 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory { @Override public synchronized HttpServletServer build(String name, String host, int port, String contextPath, boolean swagger, boolean managed) { - return build(name, false, host, port, contextPath, swagger, managed); + return build(name, false, host, port, false, contextPath, swagger, managed); } @Override @@ -83,23 +84,21 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory { return serviceList; } - for (String serviceName : serviceNames.split(SPACES_COMMA_SPACES)) { + for (String serviceName : COMMA_SPACE_PAT.split(serviceNames)) { addService(serviceList, serviceName, properties); } return serviceList; } - - @Override public HttpServletServer buildStaticResourceServer(String name, boolean https, String host, int port, - String contextPath, boolean managed) { + boolean sniHostCheck, String contextPath, boolean managed) { if (servers.containsKey(port)) { return servers.get(port); } - JettyStaticResourceServer server = new JettyStaticResourceServer(name, https, host, port, contextPath); + var server = new JettyStaticResourceServer(name, https, host, port, sniHostCheck, contextPath); if (managed) { servers.put(port, server); } @@ -111,92 +110,106 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory { String servicePrefix = PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + serviceName; - PropertyUtils props = new PropertyUtils(properties, servicePrefix, + var props = new PropertyUtils(properties, servicePrefix, (name, value, ex) -> logger - .warn("{}: {} {} is in invalid format for http service {} ", this, name, value, serviceName)); + .warn("{}: {} {} is in invalid format for http service {} ", this, name, value, serviceName)); - int servicePort = props.getInteger(PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX, -1); + var servicePort = props.getInteger(PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX, -1); if (servicePort < 0) { logger.warn("No HTTP port for service in {}", serviceName); return; } - final String hostName = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, null); - final String contextUriPath = - props.getString(PolicyEndPointProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX, null); - boolean managed = props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true); - boolean swagger = props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, false); - boolean https = props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false); + final var hostName = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, null); + final var contextUriPath = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX, null); + var managed = props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true); + var swagger = props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, false); + var https = props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false); + var sniHostCheck = props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_SNI_HOST_CHECK_SUFFIX, false); // create the service - HttpServletServer service = build(serviceName, https, hostName, servicePort, contextUriPath, swagger, managed); + HttpServletServer service = + build(serviceName, https, hostName, servicePort, sniHostCheck, contextUriPath, swagger, managed); // configure the service setSerializationProvider(props, service); - setAuthentication(props, service, contextUriPath); + setAuthentication(props, service); - final String restUriPath = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX, null); + final var restUriPath = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX, null); addFilterClasses(props, service, restUriPath); - addServletClasses(props, service, restUriPath); + addRestServletClasses(props, service, restUriPath); addServletPackages(props, service, restUriPath); + addServletClass(props, service); + setPrometheus(props, service); + serviceList.add(service); } private void setSerializationProvider(PropertyUtils props, HttpServletServer service) { - final String classProv = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER, null); + final var classProv = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER, null); if (!StringUtils.isBlank(classProv)) { service.setSerializationProvider(classProv); } } - private void setAuthentication(PropertyUtils props, HttpServletServer service, final String contextUriPath) { - /* authentication method either AAF or HTTP Basic Auth */ + private void setAuthentication(PropertyUtils props, HttpServletServer service) { + /* authentication method HTTP Basic Auth */ + final var userName = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, null); + final var password = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, null); + final var authUriPath = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX, null); - boolean aaf = props.getBoolean(PolicyEndPointProperties.PROPERTY_AAF_SUFFIX, false); - final String userName = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, null); - final String password = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, null); - final String authUriPath = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX, null); - - if (aaf) { - service.setAafAuthentication(contextUriPath); - } else if (!StringUtils.isBlank(userName) && !StringUtils.isBlank(password)) { + if (!StringUtils.isBlank(userName) && !StringUtils.isBlank(password)) { service.setBasicAuthentication(userName, password, authUriPath); } } + private void setPrometheus(PropertyUtils props, HttpServletServer service) { + if (props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_PROMETHEUS_SUFFIX, false)) { + service.setPrometheus("/metrics"); + } + } + private void addFilterClasses(PropertyUtils props, HttpServletServer service, final String restUriPath) { - final String filterClasses = - props.getString(PolicyEndPointProperties.PROPERTY_HTTP_FILTER_CLASSES_SUFFIX, null); + final var filterClasses = + props.getString(PolicyEndPointProperties.PROPERTY_HTTP_FILTER_CLASSES_SUFFIX, null); if (!StringUtils.isBlank(filterClasses)) { - for (String filterClass : filterClasses.split(SPACES_COMMA_SPACES)) { + for (String filterClass : COMMA_SPACE_PAT.split(filterClasses)) { service.addFilterClass(restUriPath, filterClass); } } } - private void addServletClasses(PropertyUtils props, HttpServletServer service, final String restUriPath) { - - final String restClasses = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, null); + private void addRestServletClasses(PropertyUtils props, HttpServletServer service, final String restUriPath) { + final var restClasses = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, null); if (!StringUtils.isBlank(restClasses)) { - for (String restClass : restClasses.split(SPACES_COMMA_SPACES)) { + for (String restClass : COMMA_SPACE_PAT.split(restClasses)) { service.addServletClass(restUriPath, restClass); } } } + private void addServletClass(PropertyUtils props, HttpServletServer service) { + var servletClass = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_SERVLET_CLASS_SUFFIX, null); + var servletUriPath = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_SERVLET_URIPATH_SUFFIX, null); + + if (!StringUtils.isBlank(servletClass) && !StringUtils.isBlank(servletUriPath)) { + service.addStdServletClass(servletUriPath, servletClass); + } + } + private void addServletPackages(PropertyUtils props, HttpServletServer service, final String restUriPath) { - final String restPackages = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX, null); + final var restPackages = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX, null); if (!StringUtils.isBlank(restPackages)) { - for (String restPackage : restPackages.split(SPACES_COMMA_SPACES)) { + for (String restPackage : COMMA_SPACE_PAT.split(restPackages)) { service.addServletPackage(restUriPath, restPackage); } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/JsonExceptionMapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/JsonExceptionMapper.java index 55b3a0d5..0030c121 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/JsonExceptionMapper.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/JsonExceptionMapper.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,11 +22,12 @@ package org.onap.policy.common.endpoints.http.server; import com.google.gson.JsonSyntaxException; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.ext.ExceptionMapper; -import javax.ws.rs.ext.Provider; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.ext.ExceptionMapper; +import jakarta.ws.rs.ext.Provider; +import lombok.AllArgsConstructor; import lombok.Getter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +39,7 @@ import org.slf4j.LoggerFactory; @Provider @Produces(MediaType.APPLICATION_JSON) public class JsonExceptionMapper implements ExceptionMapper<JsonSyntaxException> { - private static Logger logger = LoggerFactory.getLogger(JsonExceptionMapper.class); + private static final Logger logger = LoggerFactory.getLogger(JsonExceptionMapper.class); @Override public Response toResponse(JsonSyntaxException exception) { @@ -46,11 +48,8 @@ public class JsonExceptionMapper implements ExceptionMapper<JsonSyntaxException> } @Getter + @AllArgsConstructor private static class SimpleResponse { private String errorDetails; - - public SimpleResponse(String errorDetails) { - this.errorDetails = errorDetails; - } } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/RestServer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/RestServer.java index 6776a328..7e6ce866 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/RestServer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/RestServer.java @@ -1,7 +1,8 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2019 Nordix Foundation. - * Modifications Copyright (C) 2019-2020 AT&T Intellectual Property. + * Copyright (C) 2019, 2023-2024 Nordix Foundation. + * Modifications Copyright (C) 2019-2021 AT&T Intellectual Property. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,9 +22,13 @@ package org.onap.policy.common.endpoints.http.server; +import jakarta.servlet.Filter; +import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.Properties; -import org.onap.policy.common.endpoints.http.server.aaf.AafAuthFilter; +import java.util.stream.Collectors; +import lombok.ToString; import org.onap.policy.common.endpoints.parameters.RestServerParameters; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.onap.policy.common.gson.GsonMessageBodyHandler; @@ -34,6 +39,7 @@ import org.onap.policy.common.utils.services.ServiceManagerContainer; * * @author Ram Krishna Verma (ram.krishna.verma@est.tech) */ +@ToString public class RestServer extends ServiceManagerContainer { /** @@ -47,24 +53,34 @@ public class RestServer extends ServiceManagerContainer { * Constructs the object. * * @param restServerParameters the rest server parameters - * @param aafFilter class of object to use to filter AAF requests, or {@code null} * @param jaxrsProviders classes providing the services */ - public RestServer(final RestServerParameters restServerParameters, Class<? extends AafAuthFilter> aafFilter, - Class<?>... jaxrsProviders) { + public RestServer(final RestServerParameters restServerParameters, + Class<?>... jaxrsProviders) { + this(restServerParameters, null, Arrays.asList(jaxrsProviders)); + } + + /** + * Constructs the object. + * + * @param restServerParameters the rest server parameters + * @param filters class of object to use to filter requests, or {@code null} + * @param jaxrsProviders classes providing the services + */ + public RestServer(final RestServerParameters restServerParameters, List<Class<? extends Filter>> filters, + List<Class<?>> jaxrsProviders) { - if (jaxrsProviders.length == 0) { + if (jaxrsProviders == null || jaxrsProviders.isEmpty()) { throw new IllegalArgumentException("no providers specified"); } this.servers = factory.getServerFactory() - .build(getServerProperties(restServerParameters, getProviderClassNames(jaxrsProviders))); + .build(getServerProperties(restServerParameters, getProviderClassNames(jaxrsProviders))); for (HttpServletServer server : this.servers) { - if (aafFilter != null && server.isAaf()) { - server.addFilterClass(null, aafFilter.getName()); + if (filters != null && !filters.isEmpty()) { + filters.forEach(filter -> server.addFilterClass(null, filter.getName())); } - addAction("REST " + server.getName(), server::start, server::stop); } } @@ -78,15 +94,15 @@ public class RestServer extends ServiceManagerContainer { * @return the properties object */ protected Properties getServerProperties(RestServerParameters restServerParameters, String names) { - final Properties props = new Properties(); + final var props = new Properties(); props.setProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, restServerParameters.getName()); final String svcpfx = - PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + restServerParameters.getName(); + PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + restServerParameters.getName(); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, restServerParameters.getHost()); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX, - Integer.toString(restServerParameters.getPort())); + Integer.toString(restServerParameters.getPort())); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, names); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false"); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "true"); @@ -95,12 +111,19 @@ public class RestServer extends ServiceManagerContainer { props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, getValue(restServerParameters.getPassword())); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, - String.valueOf(restServerParameters.isHttps())); - props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_AAF_SUFFIX, - String.valueOf(restServerParameters.isAaf())); + String.valueOf(restServerParameters.isHttps())); + props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SNI_HOST_CHECK_SUFFIX, + String.valueOf(restServerParameters.isSniHostCHeck())); props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER, - String.join(",", GsonMessageBodyHandler.class.getName(), YamlMessageBodyHandler.class.getName(), - JsonExceptionMapper.class.getName(), YamlExceptionMapper.class.getName())); + String.join(",", GsonMessageBodyHandler.class.getName(), YamlMessageBodyHandler.class.getName(), + JsonExceptionMapper.class.getName(), YamlExceptionMapper.class.getName())); + + props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERVLET_URIPATH_SUFFIX, + Optional.ofNullable(restServerParameters.getServletUriPath()).orElse("")); + props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERVLET_CLASS_SUFFIX, + Optional.ofNullable(restServerParameters.getServletClass()).orElse("")); + props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_PROMETHEUS_SUFFIX, + String.valueOf(restServerParameters.isPrometheus())); return props; } @@ -110,18 +133,8 @@ public class RestServer extends ServiceManagerContainer { * @param jaxrsProviders classes providing the services * @return the provider class names */ - private String getProviderClassNames(Class<?>[] jaxrsProviders) { - StringBuilder names = new StringBuilder(); - - for (Class<?> prov : jaxrsProviders) { - if (names.length() > 0) { - names.append(','); - } - - names.append(prov.getName()); - } - - return names.toString(); + private String getProviderClassNames(List<Class<?>> jaxrsProviders) { + return jaxrsProviders.stream().map(Class::getName).collect(Collectors.joining(",")); } private String getValue(final String value) { @@ -131,11 +144,6 @@ public class RestServer extends ServiceManagerContainer { return value; } - @Override - public String toString() { - return "RestServer [servers=" + servers + "]"; - } - /** * Factory used to access objects. */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/YamlExceptionMapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/YamlExceptionMapper.java index 7eac932a..c30c2072 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/YamlExceptionMapper.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/YamlExceptionMapper.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,10 +21,11 @@ package org.onap.policy.common.endpoints.http.server; -import javax.ws.rs.Produces; -import javax.ws.rs.core.Response; -import javax.ws.rs.ext.ExceptionMapper; -import javax.ws.rs.ext.Provider; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.ext.ExceptionMapper; +import jakarta.ws.rs.ext.Provider; +import lombok.AllArgsConstructor; import lombok.Getter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +38,7 @@ import org.yaml.snakeyaml.error.YAMLException; @Provider @Produces(YamlMessageBodyHandler.APPLICATION_YAML) public class YamlExceptionMapper implements ExceptionMapper<YAMLException> { - private static Logger logger = LoggerFactory.getLogger(YamlExceptionMapper.class); + private static final Logger logger = LoggerFactory.getLogger(YamlExceptionMapper.class); @Override public Response toResponse(YAMLException exception) { @@ -45,11 +47,8 @@ public class YamlExceptionMapper implements ExceptionMapper<YAMLException> { } @Getter + @AllArgsConstructor private static class SimpleResponse { private String errorDetails; - - public SimpleResponse(String errorDetails) { - this.errorDetails = errorDetails; - } } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/YamlJacksonHandler.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/YamlJacksonHandler.java index f71aa90f..4639e6a2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/YamlJacksonHandler.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/YamlJacksonHandler.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,10 +22,10 @@ package org.onap.policy.common.endpoints.http.server; import com.google.gson.GsonBuilder; -import javax.ws.rs.Consumes; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.ext.Provider; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.ext.Provider; import org.onap.policy.common.gson.JacksonHandler; import org.onap.policy.common.utils.coder.YamlJsonTranslator; diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/YamlMessageBodyHandler.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/YamlMessageBodyHandler.java index 8506a283..6de6b754 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/YamlMessageBodyHandler.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/YamlMessageBodyHandler.java @@ -2,7 +2,8 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +23,13 @@ package org.onap.policy.common.endpoints.http.server; import com.google.gson.GsonBuilder; import com.google.gson.JsonSyntaxException; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.MultivaluedMap; +import jakarta.ws.rs.ext.MessageBodyReader; +import jakarta.ws.rs.ext.MessageBodyWriter; +import jakarta.ws.rs.ext.Provider; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -30,13 +38,6 @@ import java.io.OutputStreamWriter; import java.lang.annotation.Annotation; import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; -import javax.ws.rs.Consumes; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.ext.MessageBodyReader; -import javax.ws.rs.ext.MessageBodyWriter; -import javax.ws.rs.ext.Provider; import org.onap.policy.common.gson.GsonMessageBodyHandler; import org.onap.policy.common.utils.coder.YamlJsonTranslator; import org.slf4j.Logger; @@ -96,7 +97,7 @@ public class YamlMessageBodyHandler implements MessageBodyReader<Object>, Messag public void writeTo(Object object, Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType, MultivaluedMap<String, Object> httpHeaders, OutputStream entityStream) throws IOException { - try (OutputStreamWriter writer = new OutputStreamWriter(entityStream, StandardCharsets.UTF_8)) { + try (var writer = new OutputStreamWriter(entityStream, StandardCharsets.UTF_8)) { translator.toYaml(writer, object); } } @@ -121,7 +122,7 @@ public class YamlMessageBodyHandler implements MessageBodyReader<Object>, Messag public Object readFrom(Class<Object> type, Type genericType, Annotation[] annotations, MediaType mediaType, MultivaluedMap<String, String> httpHeaders, InputStream entityStream) throws IOException { - try (InputStreamReader streamReader = new InputStreamReader(entityStream, StandardCharsets.UTF_8)) { + try (var streamReader = new InputStreamReader(entityStream, StandardCharsets.UTF_8)) { Class<?> clazz = (Class<?>) genericType; return translator.fromYaml(streamReader, clazz); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/aaf/AafAuthFilter.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/aaf/AafAuthFilter.java deleted file mode 100644 index df1f6044..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/aaf/AafAuthFilter.java +++ /dev/null @@ -1,45 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.http.server.aaf; - -import javax.servlet.http.HttpServletRequest; -import org.onap.policy.common.endpoints.http.server.AuthorizationFilter; - -/** - * Generic Authorization AAF Filter Skeleton. This class will return - * a permission in AAF format. Subclasses are responsible to provide - * the AAF permission type and instance. - */ -public abstract class AafAuthFilter extends AuthorizationFilter { - - public static final String DEFAULT_NAMESPACE = "org.onap.policy"; - - @Override - protected String getRole(HttpServletRequest request) { - return - String.format("%s|%s|%s", getPermissionType(request), getPermissionInstance(request), - request.getMethod().toLowerCase()); - } - - protected abstract String getPermissionType(HttpServletRequest request); - - protected abstract String getPermissionInstance(HttpServletRequest request); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/aaf/AafGranularAuthFilter.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/aaf/AafGranularAuthFilter.java deleted file mode 100644 index 27b15a9c..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/aaf/AafGranularAuthFilter.java +++ /dev/null @@ -1,48 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.http.server.aaf; - -import javax.servlet.http.HttpServletRequest; -import org.onap.policy.common.utils.network.NetworkUtil; - -/** - * This generic class allows the mapping of REST APIs to AAF permissions - * to be evaluated in an AAF context. This class can be used for - * highly granular permissions where each REST resource can be directly - * mapped transparently to an AAF permission type, the instance being the host - * server, and the HTTP method corresponding to the action. - * Subclasses are responsible to provide the root permission prefix, typically - * the namespace. - */ -public abstract class AafGranularAuthFilter extends AafAuthFilter { - - @Override - protected String getPermissionType(HttpServletRequest request) { - return getPermissionTypeRoot() + request.getRequestURI().replace('/', '.'); - } - - @Override - protected String getPermissionInstance(HttpServletRequest request) { - return NetworkUtil.getHostname(); - } - - public abstract String getPermissionTypeRoot(); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyJerseyServer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyJerseyServer.java index df6823a0..d4c392b9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyJerseyServer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyJerseyServer.java @@ -2,8 +2,9 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019-2020 Nordix Foundation. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2019-2020,2023 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,12 +22,11 @@ package org.onap.policy.common.endpoints.http.server.internal; -import io.swagger.jersey.config.JerseyJaxrsConfig; -import java.util.HashMap; -import java.util.Map; import org.apache.commons.lang3.StringUtils; +import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.glassfish.jersey.server.ServerProperties; +import org.glassfish.jersey.servlet.ServletContainer; import org.onap.policy.common.endpoints.http.server.JsonExceptionMapper; import org.onap.policy.common.gson.GsonMessageBodyHandler; import org.onap.policy.common.utils.network.NetworkUtil; @@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory; * * <p>Note: the serialization provider will always be added to the server's class providers, as will the swagger * providers (assuming swagger has been enabled). This happens whether {@link #addServletClass(String, String)} is used - * or {@link #addServletPackage(String, String)} is used. Thus it's possible to have both the server's class provider + * or {@link #addServletPackage(String, String)} is used. Thus, it's possible to have both the server's class provider * property and the server's package provider property populated. */ public class JettyJerseyServer extends JettyServletServer { @@ -67,13 +67,7 @@ public class JettyJerseyServer extends JettyServletServer { * Jersey GSON Classes Init Param Value. */ protected static final String JERSEY_GSON_INIT_CLASSNAMES_PARAM_VALUE = - String.join(",", GsonMessageBodyHandler.class.getName(), JsonExceptionMapper.class.getName()); - - /** - * Jersey Swagger Classes Init Param Value. - */ - protected static final String SWAGGER_INIT_CLASSNAMES_PARAM_VALUE = - "io.swagger.jaxrs.listing.ApiListingResource," + "io.swagger.jaxrs.listing.SwaggerSerializers"; + String.join(",", GsonMessageBodyHandler.class.getName(), JsonExceptionMapper.class.getName()); /** * Logger. @@ -81,11 +75,6 @@ public class JettyJerseyServer extends JettyServletServer { protected static Logger logger = LoggerFactory.getLogger(JettyJerseyServer.class); /** - * Container for servlets. - */ - protected final Map<String, ServletHolder> servlets = new HashMap<>(); - - /** * Swagger ID. */ protected String swaggerId = null; @@ -102,14 +91,15 @@ public class JettyJerseyServer extends JettyServletServer { * @param https enable https? * @param host host server host * @param port port server port + * @param sniHostCheck SNI Host checking flag * @param swagger support swagger? * @param contextPath context path - * * @throws IllegalArgumentException in invalid arguments are provided */ - public JettyJerseyServer(String name, boolean https, String host, int port, String contextPath, boolean swagger) { + public JettyJerseyServer(String name, boolean https, String host, int port, boolean sniHostCheck, + String contextPath, boolean swagger) { - super(name, https, host, port, contextPath); + super(name, https, host, port, sniHostCheck, contextPath); if (swagger) { this.swaggerId = "swagger-" + this.port; attachSwaggerServlet(https); @@ -121,7 +111,10 @@ public class JettyJerseyServer extends JettyServletServer { */ protected void attachSwaggerServlet(boolean https) { - ServletHolder swaggerServlet = context.addServlet(JerseyJaxrsConfig.class, "/"); + ServletContextHandler handler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + handler.setContextPath("/"); + + ServletHolder swaggerServlet = handler.addServlet(ServletContainer.class, "/*"); String hostname = this.connector.getHost(); if (StringUtils.isBlank(hostname) || hostname.equals(NetworkUtil.IPV4_WILDCARD_ADDRESS)) { @@ -129,7 +122,7 @@ public class JettyJerseyServer extends JettyServletServer { } swaggerServlet.setInitParameter(SWAGGER_API_BASEPATH, - ((https) ? "https://" : "http://") + hostname + ":" + this.connector.getPort() + "/"); + ((https) ? "https://" : "http://") + hostname + ":" + this.connector.getPort() + "/"); swaggerServlet.setInitParameter(SWAGGER_CONTEXT_ID, swaggerId); swaggerServlet.setInitParameter(SWAGGER_SCANNER_ID, swaggerId); swaggerServlet.setInitParameter(SWAGGER_PRETTY_PRINT, "true"); @@ -149,15 +142,10 @@ public class JettyJerseyServer extends JettyServletServer { * @throws IllegalArgumentException if invalid arguments are provided */ protected synchronized ServletHolder getServlet(String servletPath) { - - return servlets.computeIfAbsent(servletPath, key -> { - - ServletHolder jerseyServlet = - context.addServlet(org.glassfish.jersey.servlet.ServletContainer.class, servletPath); - jerseyServlet.setInitOrder(0); - - return jerseyServlet; - }); + ServletHolder jerseyServlet = + super.getServlet(org.glassfish.jersey.servlet.ServletContainer.class, servletPath); + jerseyServlet.setInitOrder(0); + return jerseyServlet; } @Override @@ -236,8 +224,8 @@ public class JettyJerseyServer extends JettyServletServer { initClasses = classProvider; if (this.swaggerId != null) { - initClasses += "," + SWAGGER_INIT_CLASSNAMES_PARAM_VALUE; - + jerseyServlet.setInitParameter("jersey.config.server.provider.packages", + "io.swagger.v3.jaxrs2.integration.resources,io.swagger.sample.resource"); jerseyServlet.setInitParameter(SWAGGER_CONTEXT_ID, swaggerId); jerseyServlet.setInitParameter(SWAGGER_SCANNER_ID, swaggerId); } @@ -258,9 +246,9 @@ public class JettyJerseyServer extends JettyServletServer { @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("JettyJerseyServer [Jerseyservlets=").append(servlets).append(", swaggerId=").append(swaggerId) - .append(", toString()=").append(super.toString()).append("]"); - return builder.toString(); + return "JettyJerseyServer [Jerseyservlets=" + servlets + + ", swaggerId=" + swaggerId + + ", toString()=" + super.toString() + + "]"; } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyServletServer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyServletServer.java index 699d5d43..78858a77 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyServletServer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyServletServer.java @@ -2,8 +2,9 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019-2020 Nordix Foundation. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2019-2020, 2023-2024 Nordix Foundation. + * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +22,15 @@ package org.onap.policy.common.endpoints.http.server.internal; +import io.prometheus.client.hotspot.DefaultExports; +import io.prometheus.client.servlet.jakarta.exporter.MetricsServlet; +import jakarta.servlet.Servlet; import java.util.EnumSet; -import javax.servlet.DispatcherType; +import java.util.HashMap; +import java.util.Map; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; import org.eclipse.jetty.security.ConstraintMapping; import org.eclipse.jetty.security.ConstraintSecurityHandler; import org.eclipse.jetty.security.HashLoginService; @@ -35,60 +43,70 @@ import org.eclipse.jetty.server.SecureRequestCustomizer; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.Slf4jRequestLogWriter; -import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.security.Constraint; import org.eclipse.jetty.util.security.Credential; import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.onap.aaf.cadi.filter.CadiFilter; import org.onap.policy.common.endpoints.http.server.HttpServletServer; -import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Http Server implementation using Embedded Jetty. */ +@ToString public abstract class JettyServletServer implements HttpServletServer, Runnable { /** * Keystore/Truststore system property names. */ public static final String SYSTEM_KEYSTORE_PROPERTY_NAME = "javax.net.ssl.keyStore"; - public static final String SYSTEM_KEYSTORE_PASSWORD_PROPERTY_NAME = "javax.net.ssl.keyStorePassword"; //NOSONAR + public static final String SYSTEM_KEYSTORE_PASSWORD_PROPERTY_NAME = "javax.net.ssl.keyStorePassword"; // NOSONAR public static final String SYSTEM_TRUSTSTORE_PROPERTY_NAME = "javax.net.ssl.trustStore"; - public static final String SYSTEM_TRUSTSTORE_PASSWORD_PROPERTY_NAME = "javax.net.ssl.trustStorePassword"; //NOSONAR + public static final String SYSTEM_TRUSTSTORE_PASSWORD_PROPERTY_NAME = "javax.net.ssl.trustStorePassword"; // NOSONAR /** * Logger. */ - private static Logger logger = LoggerFactory.getLogger(JettyServletServer.class); + private static final Logger logger = LoggerFactory.getLogger(JettyServletServer.class); private static final String NOT_SUPPORTED = " is not supported on this type of jetty server"; /** * Server name. */ + @Getter protected final String name; /** * Server host address. */ + @Getter protected final String host; /** * Server port to bind. */ + @Getter protected final int port; /** - * Server auth user name. + * Should SNI host checking be done. */ + @Getter + protected boolean sniHostCheck; + + /** + * Server auth username. + */ + @Getter protected String user; /** * Server auth password name. */ + @Getter protected String password; /** @@ -117,9 +135,15 @@ public abstract class JettyServletServer implements HttpServletServer, Runnable protected Thread jettyThread; /** + * Container for default servlets. + */ + protected final Map<String, ServletHolder> servlets = new HashMap<>(); + + /** * Start condition. */ - protected Object startCondition = new Object(); + @ToString.Exclude + protected final Object startCondition = new Object(); /** * Constructor. @@ -127,11 +151,13 @@ public abstract class JettyServletServer implements HttpServletServer, Runnable * @param name server name * @param host server host * @param port server port + * @param sniHostCheck SNI Host checking flag * @param contextPath context path * * @throws IllegalArgumentException if invalid parameters are passed in */ - public JettyServletServer(String name, boolean https, String host, int port, String contextPath) { + protected JettyServletServer(String name, boolean https, String host, int port, boolean sniHostCheck, + String contextPath) { String srvName = name; if (srvName == null || srvName.isEmpty()) { @@ -156,6 +182,7 @@ public abstract class JettyServletServer implements HttpServletServer, Runnable this.host = srvHost; this.port = port; + this.sniHostCheck = sniHostCheck; this.contextPath = ctxtPath; @@ -164,8 +191,7 @@ public abstract class JettyServletServer implements HttpServletServer, Runnable this.jettyServer = new Server(); - CustomRequestLog requestLog = - new CustomRequestLog(new Slf4jRequestLogWriter(), CustomRequestLog.EXTENDED_NCSA_FORMAT); + var requestLog = new CustomRequestLog(new Slf4jRequestLogWriter(), CustomRequestLog.EXTENDED_NCSA_FORMAT); this.jettyServer.setRequestLog(requestLog); if (https) { @@ -183,8 +209,8 @@ public abstract class JettyServletServer implements HttpServletServer, Runnable this.jettyServer.setHandler(context); } - public JettyServletServer(String name, String host, int port, String contextPath) { - this(name, false, host, port, contextPath); + protected JettyServletServer(String name, String host, int port, boolean sniHostCheck, String contextPath) { + this(name, false, host, port, sniHostCheck, contextPath); } @Override @@ -198,7 +224,20 @@ public abstract class JettyServletServer implements HttpServletServer, Runnable tempFilterPath = "/*"; } - context.addFilter(filterClass, tempFilterPath, EnumSet.of(DispatcherType.INCLUDE, DispatcherType.REQUEST)); + context.addFilter(filterClass, tempFilterPath, + EnumSet.of(jakarta.servlet.DispatcherType.INCLUDE, jakarta.servlet.DispatcherType.REQUEST)); + } + + protected ServletHolder getServlet(@NonNull Class<? extends Servlet> servlet, @NonNull String servletPath) { + synchronized (servlets) { + return servlets.computeIfAbsent(servletPath, key -> context.addServlet(servlet, servletPath)); + } + } + + protected ServletHolder getServlet(String servletClass, String servletPath) { + synchronized (servlets) { + return servlets.computeIfAbsent(servletPath, key -> context.addServlet(servletClass, servletPath)); + } } /** @@ -207,32 +246,35 @@ public abstract class JettyServletServer implements HttpServletServer, Runnable * @return the server connector */ public ServerConnector httpsConnector() { - SslContextFactory sslContextFactory = new SslContextFactory.Server(); + SslContextFactory.Server sslContextFactoryServer = new SslContextFactory.Server(); String keyStore = System.getProperty(SYSTEM_KEYSTORE_PROPERTY_NAME); if (keyStore != null) { - sslContextFactory.setKeyStorePath(keyStore); + sslContextFactoryServer.setKeyStorePath(keyStore); String ksPassword = System.getProperty(SYSTEM_KEYSTORE_PASSWORD_PROPERTY_NAME); if (ksPassword != null) { - sslContextFactory.setKeyStorePassword(ksPassword); + sslContextFactoryServer.setKeyStorePassword(ksPassword); } } String trustStore = System.getProperty(SYSTEM_TRUSTSTORE_PROPERTY_NAME); if (trustStore != null) { - sslContextFactory.setTrustStorePath(trustStore); + sslContextFactoryServer.setTrustStorePath(trustStore); String tsPassword = System.getProperty(SYSTEM_TRUSTSTORE_PASSWORD_PROPERTY_NAME); if (tsPassword != null) { - sslContextFactory.setTrustStorePassword(tsPassword); + sslContextFactoryServer.setTrustStorePassword(tsPassword); } } - HttpConfiguration https = new HttpConfiguration(); - https.addCustomizer(new SecureRequestCustomizer()); - return new ServerConnector(jettyServer, sslContextFactory, new HttpConnectionFactory(https)); + var httpsConfiguration = new HttpConfiguration(); + SecureRequestCustomizer src = new SecureRequestCustomizer(); + src.setSniHostCheck(sniHostCheck); + httpsConfiguration.addCustomizer(src); + + return new ServerConnector(jettyServer, sslContextFactoryServer, new HttpConnectionFactory(httpsConfiguration)); } public ServerConnector httpConnector() { @@ -240,21 +282,6 @@ public abstract class JettyServletServer implements HttpServletServer, Runnable } @Override - public void setAafAuthentication(String filterPath) { - this.addFilterClass(filterPath, CadiFilter.class.getName()); - } - - @Override - public boolean isAaf() { - for (FilterHolder filter : context.getServletHandler().getFilters()) { - if (CadiFilter.class.getName().equals(filter.getClassName())) { - return true; - } - } - return false; - } - - @Override public void setBasicAuthentication(String user, String password, String servletPath) { String srvltPath = servletPath; @@ -266,22 +293,26 @@ public abstract class JettyServletServer implements HttpServletServer, Runnable srvltPath = "/*"; } - final HashLoginService hashLoginService = new HashLoginService(); - final UserStore userStore = new UserStore(); - userStore.addUser(user, Credential.getCredential(password), new String[] {"user"}); + final var hashLoginService = new HashLoginService(); + final var userStore = new UserStore(); + userStore.addUser(user, Credential.getCredential(password), new String[] { + "user" + }); hashLoginService.setUserStore(userStore); hashLoginService.setName(this.connector.getName() + "-login-service"); - Constraint constraint = new Constraint(); + var constraint = new Constraint(); constraint.setName(Constraint.__BASIC_AUTH); - constraint.setRoles(new String[] {"user"}); + constraint.setRoles(new String[] { + "user" + }); constraint.setAuthenticate(true); - ConstraintMapping constraintMapping = new ConstraintMapping(); + var constraintMapping = new ConstraintMapping(); constraintMapping.setConstraint(constraint); constraintMapping.setPathSpec(srvltPath); - ConstraintSecurityHandler securityHandler = new ConstraintSecurityHandler(); + var securityHandler = new ConstraintSecurityHandler(); securityHandler.setAuthenticator(new BasicAuthenticator()); securityHandler.setRealmName(this.connector.getName() + "-realm"); securityHandler.addConstraintMapping(constraintMapping); @@ -312,6 +343,11 @@ public abstract class JettyServletServer implements HttpServletServer, Runnable } this.jettyServer.join(); + + } catch (InterruptedException e) { + logger.error("{}: error found while bringing up server", this, e); + Thread.currentThread().interrupt(); + } catch (Exception e) { logger.error("{}: error found while bringing up server", this, e); } @@ -454,56 +490,34 @@ public abstract class JettyServletServer implements HttpServletServer, Runnable } @Override - public int getPort() { - return this.port; + public void setSerializationProvider(String provider) { + throw new UnsupportedOperationException("setSerializationProvider()" + NOT_SUPPORTED); } - /** - * Get name. - * - * @return the name - */ @Override - public String getName() { - return name; - } - - /** - * Get host. - * - * @return the host - */ - public String getHost() { - return host; - } - - /** - * Get user. - * - * @return the user - */ - public String getUser() { - return user; + public void addServletClass(String servletPath, String servletClass) { + throw new UnsupportedOperationException("addServletClass()" + NOT_SUPPORTED); } - /** - * Get password. - * - * @return the password - */ - @GsonJsonIgnore - public String getPassword() { - return password; + @Override + public void addStdServletClass(@NonNull String servletPath, @NonNull String plainServletClass) { + this.getServlet(plainServletClass, servletPath); } @Override - public void setSerializationProvider(String provider) { - throw new UnsupportedOperationException("setSerializationProvider()" + NOT_SUPPORTED); + public void setPrometheus(String metricsPath) { + this.getServlet(MetricsServlet.class, metricsPath); + DefaultExports.initialize(); } @Override - public void addServletClass(String servletPath, String restClass) { - throw new UnsupportedOperationException("addServletClass()" + NOT_SUPPORTED); + public boolean isPrometheus() { + for (ServletHolder servlet : context.getServletHandler().getServlets()) { + if (MetricsServlet.class.getName().equals(servlet.getClassName())) { + return true; + } + } + return false; } @Override @@ -516,15 +530,4 @@ public abstract class JettyServletServer implements HttpServletServer, Runnable throw new UnsupportedOperationException("addServletResource()" + NOT_SUPPORTED); } - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("JettyServer [name=").append(name).append(", host=").append(host).append(", port=").append(port) - .append(", user=").append(user).append(", password=").append(password != null).append(", contextPath=") - .append(contextPath).append(", jettyServer=").append(jettyServer).append(", context=") - .append(this.context).append(", connector=").append(connector).append(", jettyThread=") - .append(jettyThread).append("]"); - return builder.toString(); - } - } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyStaticResourceServer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyStaticResourceServer.java index 15d9990b..ee2b0540 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyStaticResourceServer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyStaticResourceServer.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2020 Nordix Foundation. + * Copyright (C) 2020, 2023-2024 Nordix Foundation. + * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +21,6 @@ package org.onap.policy.common.endpoints.http.server.internal; -import java.util.HashMap; -import java.util.Map; import lombok.ToString; import org.apache.commons.lang3.StringUtils; import org.eclipse.jetty.servlet.DefaultServlet; @@ -30,7 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Jetty Server that uses DefaultServlets to support web static resources management. + * Jetty Server that uses DefaultServlets to support web static resources' management. */ @ToString public class JettyStaticResourceServer extends JettyServletServer { @@ -56,24 +55,20 @@ public class JettyStaticResourceServer extends JettyServletServer { protected static Logger logger = LoggerFactory.getLogger(JettyStaticResourceServer.class); /** - * Container for default servlets. - */ - protected final Map<String, ServletHolder> servlets = new HashMap<>(); - - /** * Constructor. * * @param name name * @param https enable https? * @param host host server host * @param port port server port + * @param sniHostCheck SNI Host checking flag * @param contextPath context path - * * @throws IllegalArgumentException in invalid arguments are provided */ - public JettyStaticResourceServer(String name, boolean https, String host, int port, String contextPath) { + public JettyStaticResourceServer(String name, boolean https, String host, int port, boolean sniHostCheck, + String contextPath) { - super(name, https, host, port, contextPath); + super(name, https, host, port, sniHostCheck, contextPath); } /** @@ -84,15 +79,14 @@ public class JettyStaticResourceServer extends JettyServletServer { * * @throws IllegalArgumentException if invalid arguments are provided */ - protected synchronized ServletHolder getDefaultServlet(String servPath) { - - return servlets.computeIfAbsent(servPath, key -> context.addServlet(DefaultServlet.class, servPath)); + protected synchronized ServletHolder getDefaultServlet(String servletPath) { + return super.getServlet(DefaultServlet.class, servletPath); } @Override - public synchronized void addServletResource(String servletPath, String resoureBase) { + public synchronized void addServletResource(String servletPath, String resourceBase) { - if (StringUtils.isBlank(resoureBase)) { + if (StringUtils.isBlank(resourceBase)) { throw new IllegalArgumentException("No resourceBase provided"); } @@ -102,7 +96,7 @@ public class JettyStaticResourceServer extends JettyServletServer { ServletHolder defaultServlet = this.getDefaultServlet(servletPath); - defaultServlet.setInitParameter(SERVLET_HOLDER_RESOURCE_BASE, resoureBase); + defaultServlet.setInitParameter(SERVLET_HOLDER_RESOURCE_BASE, resourceBase); defaultServlet.setInitParameter(SERVLET_HOLDER_DIR_ALLOWED, "false"); defaultServlet.setInitParameter(SERVLET_HOLDER_PATH_INFO_ONLY, "true"); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/JsonListener.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/JsonListener.java index ff8cbc5b..2e171070 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/JsonListener.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/JsonListener.java @@ -2,7 +2,8 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +21,8 @@ package org.onap.policy.common.endpoints.listeners; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.utils.coder.Coder; @@ -33,6 +36,7 @@ import org.slf4j.LoggerFactory; * Listens for messages received on a topic, in JSON format, decodes them into a * {@link StandardCoderObject}, and then offers the objects to the subclass. */ +@NoArgsConstructor(access = AccessLevel.PROTECTED) public abstract class JsonListener implements TopicListener { private static final Logger logger = LoggerFactory.getLogger(JsonListener.class); @@ -41,13 +45,6 @@ public abstract class JsonListener implements TopicListener { */ private static final Coder coder = new StandardCoder(); - /** - * Constructs the object. - */ - public JsonListener() { - super(); - } - @Override public void onTopicEvent(CommInfrastructure infra, String topic, String event) { // decode from JSON into a standard object diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcher.java index 31e670a0..41f9abdb 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcher.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -80,7 +80,7 @@ public class MessageTypeDispatcher extends JsonListener { @Override public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco) { // extract the message type - final String type = sco.getString(messageFieldNames); + final var type = sco.getString(messageFieldNames); if (type == null) { logger.warn("unable to extract {}: {}", fullMessageFieldName, sco); return; diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java index e4686c1e..e575e33b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -111,7 +111,7 @@ public class RequestIdDispatcher<T> extends ScoListener<T> { public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) { // extract the request id - String reqid = sco.getString(requestIdFieldNames); + var reqid = sco.getString(requestIdFieldNames); // dispatch the message if (Strings.isNullOrEmpty(reqid)) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/ScoListener.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/ScoListener.java index a3d33965..dc6ff12a 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/ScoListener.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/ScoListener.java @@ -2,7 +2,8 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +21,8 @@ package org.onap.policy.common.endpoints.listeners; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; @@ -34,6 +37,7 @@ import org.slf4j.LoggerFactory; * * @param <T> type of message/POJO this handles */ +@AllArgsConstructor(access = AccessLevel.PROTECTED) public abstract class ScoListener<T> { private static final Logger logger = LoggerFactory.getLogger(ScoListener.class); @@ -49,15 +53,6 @@ public abstract class ScoListener<T> { private final Class<T> clazz; /** - * Constructs the object. - * - * @param clazz class of message this handles - */ - public ScoListener(Class<T> clazz) { - this.clazz = clazz; - } - - /** * Receives an event, translates it into the desired type of object, and passes it to * the subclass. * diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/RestClientParameters.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/RestClientParameters.java new file mode 100644 index 00000000..5d02e753 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/RestClientParameters.java @@ -0,0 +1,57 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.parameters; + +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.parameters.BeanValidationResult; +import org.onap.policy.common.parameters.ParameterGroup; +import org.onap.policy.common.parameters.ValidationStatus; + +public class RestClientParameters extends BusTopicParams implements ParameterGroup { + + private static final String MSG_IS_BLANK = "is blank"; + + @Override + public String getName() { + return getClientName(); + } + + @Override + public void setName(String name) { + setClientName(name); + } + + @Override + public BeanValidationResult validate() { + var result = new BeanValidationResult(getClientName(), this); + if (isHostnameInvalid()) { + result.addResult("hostname", getHostname(), ValidationStatus.INVALID, MSG_IS_BLANK); + } + if (isClientNameInvalid()) { + result.addResult("clientName", getClientName(), ValidationStatus.INVALID, MSG_IS_BLANK); + } + if (isPortInvalid()) { + result.addResult("port", getPort(), ValidationStatus.INVALID, "is not valid"); + } + return result; + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/RestServerParameters.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/RestServerParameters.java index 53154b9e..9ffe5cb7 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/RestServerParameters.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/RestServerParameters.java @@ -1,7 +1,8 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2019 Nordix Foundation. - * Modifications Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019,2023 Nordix Foundation. + * Modifications Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,10 +33,10 @@ import org.onap.policy.common.parameters.annotations.NotNull; * * @author Ajith Sreekumar (ajith.sreekumar@est.tech) */ -@NotNull @NotBlank @Getter public class RestServerParameters extends ParameterGroupImpl { + @NotNull private String host; @Min(value = 1) @@ -44,7 +45,11 @@ public class RestServerParameters extends ParameterGroupImpl { private String userName; private String password; private boolean https; + private boolean sniHostCHeck; private boolean aaf; + private boolean prometheus; + private String servletClass; + private String servletUriPath; public RestServerParameters() { super(RestServerParameters.class.getSimpleName()); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java index 4cd3893d..d63134bc 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2019 Nordix Foundation. - * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2024 Nordix Foundation. + * Modifications Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ import java.util.List; import lombok.Getter; import lombok.Setter; import org.apache.commons.lang3.StringUtils; -import org.onap.policy.common.parameters.GroupValidationResult; +import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.ParameterGroupImpl; import org.onap.policy.common.parameters.ValidationStatus; import org.onap.policy.common.parameters.annotations.NotBlank; @@ -53,20 +53,20 @@ public class TopicParameterGroup extends ParameterGroupImpl { * {@inheritDoc}. */ @Override - public GroupValidationResult validate() { - GroupValidationResult result = super.validate(); + public BeanValidationResult validate() { + BeanValidationResult result = super.validate(); if (result.isValid()) { - StringBuilder errorMsg = new StringBuilder(); + var errorMsg = new StringBuilder(); StringBuilder missingSourceParams = checkMissingMandatoryParams(topicSources); - if (missingSourceParams.length() > 0) { + if (!missingSourceParams.isEmpty()) { errorMsg.append(missingSourceParams.append("missing in topicSources. ")); } StringBuilder missingSinkParams = checkMissingMandatoryParams(topicSinks); - if (missingSinkParams.length() > 0) { + if (!missingSinkParams.isEmpty()) { errorMsg.append(missingSinkParams.append("missing in topicSinks.")); } - if (errorMsg.length() > 0) { + if (!errorMsg.isEmpty()) { errorMsg.insert(0, "Mandatory parameters are missing. "); result.setResult(ValidationStatus.INVALID, errorMsg.toString()); } @@ -75,7 +75,7 @@ public class TopicParameterGroup extends ParameterGroupImpl { } private StringBuilder checkMissingMandatoryParams(List<TopicParameters> topicParametersList) { - StringBuilder missingParams = new StringBuilder(); + var missingParams = new StringBuilder(); for (TopicParameters topicParameters : topicParametersList) { if (StringUtils.isBlank(topicParameters.getTopic())) { missingParams.append("topic, "); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java index ef9259bf..5d36a313 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java @@ -2,7 +2,8 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022,2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +21,11 @@ package org.onap.policy.common.endpoints.properties; -public class PolicyEndPointProperties { +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class PolicyEndPointProperties { /* Generic property suffixes */ @@ -28,8 +33,6 @@ public class PolicyEndPointProperties { public static final String PROPERTY_TOPIC_TOPICS_SUFFIX = ".topics"; public static final String PROPERTY_TOPIC_API_KEY_SUFFIX = ".apiKey"; public static final String PROPERTY_TOPIC_API_SECRET_SUFFIX = ".apiSecret"; - public static final String PROPERTY_TOPIC_AAF_MECHID_SUFFIX = ".aafMechId"; - public static final String PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX = ".aafPassword"; //NOSONAR public static final String PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX = ".effectiveTopic"; public static final String PROPERTY_TOPIC_EVENTS_SUFFIX = ".events"; public static final String PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX = ".filter"; @@ -40,41 +43,20 @@ public class PolicyEndPointProperties { public static final String PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX = ".fetchTimeout"; public static final String PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX = ".fetchLimit"; public static final String PROPERTY_MANAGED_SUFFIX = ".managed"; - public static final String PROPERTY_AAF_SUFFIX = ".aaf"; + public static final String PROPERTY_ADDITIONAL_PROPS_SUFFIX = ".additionalProps"; public static final String PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX = ".partitionKey"; public static final String PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX = ".selfSignedCertificates"; - /* UEB Properties */ - - public static final String PROPERTY_UEB_SOURCE_TOPICS = "ueb.source.topics"; - public static final String PROPERTY_UEB_SINK_TOPICS = "ueb.sink.topics"; - - /* DMAAP Properties */ - - public static final String PROPERTY_DMAAP_SOURCE_TOPICS = "dmaap.source.topics"; - public static final String PROPERTY_DMAAP_SINK_TOPICS = "dmaap.sink.topics"; - - public static final String PROPERTY_DMAAP_DME2_PARTNER_SUFFIX = ".dme2.partner"; - public static final String PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX = ".dme2.routeOffer"; - public static final String PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX = ".dme2.environment"; - public static final String PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX = ".dme2.aft.environment"; - public static final String PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX = ".dme2.latitude"; - public static final String PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX = ".dme2.longitude"; - - public static final String PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX = ".dme2.epReadTimeoutMs"; - public static final String PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX = ".dme2.epConnTimeout"; - public static final String PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX = ".dme2.roundtripTimeoutMs"; - public static final String PROPERTY_DMAAP_DME2_VERSION_SUFFIX = ".dme2.version"; - public static final String PROPERTY_DMAAP_DME2_SERVICE_NAME_SUFFIX = ".dme2.serviceName"; - public static final String PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX = ".dme2.subContextPath"; - public static final String PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX = - ".dme2.sessionStickinessRequired"; - public static final String PROPERTY_NOOP_SOURCE_TOPICS = "noop.source.topics"; public static final String PROPERTY_NOOP_SINK_TOPICS = "noop.sink.topics"; + /* KAFKA Properties */ + + public static final String PROPERTY_KAFKA_SOURCE_TOPICS = "kafka.source.topics"; + public static final String PROPERTY_KAFKA_SINK_TOPICS = "kafka.sink.topics"; + /* HTTP Server Properties */ public static final String PROPERTY_HTTP_SERVER_SERVICES = "http.server.services"; @@ -92,8 +74,13 @@ public class PolicyEndPointProperties { public static final String PROPERTY_HTTP_REST_PACKAGES_SUFFIX = ".restPackages"; public static final String PROPERTY_HTTP_REST_URIPATH_SUFFIX = ".restUriPath"; + public static final String PROPERTY_HTTP_SERVLET_URIPATH_SUFFIX = ".servletUriPath"; + public static final String PROPERTY_HTTP_SERVLET_CLASS_SUFFIX = ".servletClass"; + public static final String PROPERTY_HTTP_PROMETHEUS_SUFFIX = ".prometheus"; + public static final String PROPERTY_HTTP_HTTPS_SUFFIX = ".https"; public static final String PROPERTY_HTTP_SWAGGER_SUFFIX = ".swagger"; + public static final String PROPERTY_HTTP_SNI_HOST_CHECK_SUFFIX = ".sniHostCheck"; public static final String PROPERTY_HTTP_SERIALIZATION_PROVIDER = ".serialization.provider"; @@ -103,19 +90,6 @@ public class PolicyEndPointProperties { public static final String PROPERTY_HTTP_URL_SUFFIX = PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX; - - /* DMaaP DME2 Topic Properties */ - - public static final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; - public static final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; - public static final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; - public static final String DME2_VERSION_PROPERTY = "Version"; - public static final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; - public static final String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; - public static final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; - public static final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; - - /* Topic Sink Values */ /** @@ -145,9 +119,4 @@ public class PolicyEndPointProperties { * Definition of No limit fetching. */ public static final int NO_LIMIT_FETCH = -1; - - - private PolicyEndPointProperties() { - // do nothing - } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/report/HealthCheckReport.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/report/HealthCheckReport.java index fa2f3d23..bb80fccb 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/report/HealthCheckReport.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/report/HealthCheckReport.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,11 +21,18 @@ package org.onap.policy.common.endpoints.report; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + /** * Class to represent health check report of a service. * * @author Ram Krishna Verma (ram.krishna.verma@ericsson.com) */ +@Getter +@Setter +@ToString public class HealthCheckReport { private String name; @@ -33,111 +40,4 @@ public class HealthCheckReport { private boolean healthy; private int code; private String message; - - /** - * Returns the name of this report. - * - * @return the name - */ - public String getName() { - return name; - } - - /** - * Set the name of this report. - * - * @param name the name to set - */ - public void setName(final String name) { - this.name = name; - } - - /** - * Returns the url of this report. - * - * @return the url - */ - public String getUrl() { - return url; - } - - /** - * Set the url of this report. - * - * @param url the url to set - */ - public void setUrl(final String url) { - this.url = url; - } - - /** - * Returns the health status of this report. - * - * @return the healthy - */ - public boolean isHealthy() { - return healthy; - } - - /** - * Set the health status of this report. - * - * @param healthy the healthy to set - */ - public void setHealthy(final boolean healthy) { - this.healthy = healthy; - } - - /** - * Returns the code of this report. - * - * @return the code - */ - public int getCode() { - return code; - } - - /** - * Set the code of this report. - * - * @param code the code to set - */ - public void setCode(final int code) { - this.code = code; - } - - /** - * Returns the message of this report. - * - * @return the message - */ - public String getMessage() { - return message; - } - - /** - * Set the message of this report. - * - * @param message the message to set - */ - public void setMessage(final String message) { - this.message = message; - } - - @Override - public String toString() { - final StringBuilder builder = new StringBuilder(); - builder.append("Report [name="); - builder.append(getName()); - builder.append(", url="); - builder.append(getUrl()); - builder.append(", healthy="); - builder.append(isHealthy()); - builder.append(", code="); - builder.append(getCode()); - builder.append(", message="); - builder.append(getMessage()); - builder.append("]"); - return builder.toString(); - } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/DmaapPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/DmaapPropertyUtils.java deleted file mode 100644 index 5cb220b2..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/DmaapPropertyUtils.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.utils; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.commons.lang3.StringUtils; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; - -public class DmaapPropertyUtils { - - /** - * Maps a topic property to a DME property. - */ - private static final Map<String, String> PROP_TO_DME; - - static { - Map<String, String> map = new HashMap<>(); - - map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX, - PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY); - - map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX, - PolicyEndPointProperties.DME2_READ_TIMEOUT_PROPERTY); - - map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX, - PolicyEndPointProperties.DME2_EP_CONN_TIMEOUT_PROPERTY); - - map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX, - PolicyEndPointProperties.DME2_ROUNDTRIP_TIMEOUT_PROPERTY); - - map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX, - PolicyEndPointProperties.DME2_VERSION_PROPERTY); - - map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX, - PolicyEndPointProperties.DME2_SUBCONTEXT_PATH_PROPERTY); - - map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX, - PolicyEndPointProperties.DME2_SESSION_STICKINESS_REQUIRED_PROPERTY); - - PROP_TO_DME = Collections.unmodifiableMap(map); - } - - private DmaapPropertyUtils() { - // do nothing - } - - /** - * Makes a topic builder, configuring it with properties that are common to both - * sources and sinks. - * - * @param props properties to be used to configure the builder - * @param topic topic being configured - * @param servers target servers - * @return a topic builder - */ - public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) { - - /* Additional DME2 Properties */ - - Map<String, String> dme2AdditionalProps = new HashMap<>(); - - for (Map.Entry<String, String> ent : PROP_TO_DME.entrySet()) { - String propName = ent.getKey(); - String value = props.getString(propName, null); - - if (!StringUtils.isBlank(value)) { - String dmeName = ent.getValue(); - dme2AdditionalProps.put(dmeName, value); - } - } - - final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - return BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .effectiveTopic(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, - topic)) - .apiKey(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX, null)) - .apiSecret(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX, null)) - .userName(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX, null)) - .password(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX, null)) - .environment(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX, - null)) - .aftEnvironment(props.getString( - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX, null)) - .partner(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX, null)) - .latitude(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX, null)) - .longitude(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX, null)) - .additionalProps(dme2AdditionalProps) - .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true)) - .useHttps(props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false)) - .allowSelfSignedCerts(props.getBoolean( - PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX, false)); - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java new file mode 100644 index 00000000..2e137ce7 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java @@ -0,0 +1,80 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2022-2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.utils; + +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ADDITIONAL_PROPS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.re2j.Pattern; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class KafkaPropertyUtils { + private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); + + /** + * Makes a topic builder, configuring it with properties that are common to both + * sources and sinks. + * + * @param props properties to be used to configure the builder + * @param topic topic being configured + * @param servers target servers + * @return a topic builder + */ + public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) { + + final List<String> serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers))); + return BusTopicParams.builder() + .servers(serverList) + .topic(topic) + .effectiveTopic(props.getString(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic)) + .managed(props.getBoolean(PROPERTY_MANAGED_SUFFIX, true)) + .additionalProps(getAdditionalProps(props.getString(PROPERTY_ADDITIONAL_PROPS_SUFFIX, ""))); + } + + private static Map<String, String> getAdditionalProps(String additionalPropsString) { + try { + Map<String, String> additionalProps = new HashMap<>(); + var converted = new ObjectMapper().readValue(additionalPropsString, Map.class); + converted.forEach((k, v) -> { + if (k instanceof String key && v instanceof String value) { + additionalProps.put(key, value); + } + }); + return additionalProps; + } catch (Exception e) { + return Collections.emptyMap(); + } + + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/NetLoggerUtil.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/NetLoggerUtil.java index c7dd516f..6002c3f6 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/NetLoggerUtil.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/NetLoggerUtil.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ package org.onap.policy.common.endpoints.utils; +import lombok.Getter; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.features.NetLoggerFeatureProviders; import org.onap.policy.common.utils.services.FeatureApiUtils; @@ -37,7 +38,8 @@ public class NetLoggerUtil { * Loggers. */ private static final Logger logger = LoggerFactory.getLogger(NetLoggerUtil.class); - private static final Logger netLogger = LoggerFactory.getLogger("network"); + @Getter + private static final Logger networkLogger = LoggerFactory.getLogger("network"); /** * Constant for the system line separator. @@ -52,15 +54,6 @@ public class NetLoggerUtil { } /** - * Get Network Logger. - * - * @return logger instance - */ - public static Logger getNetworkLogger() { - return netLogger; - } - - /** * Logs a message to the network logger. * * @param type can either be IN or OUT @@ -69,7 +62,7 @@ public class NetLoggerUtil { * @param message message to be logged */ public static void log(EventType type, CommInfrastructure protocol, String topic, String message) { - log(netLogger, type, protocol, topic, message); + log(networkLogger, type, protocol, topic, message); } /** @@ -85,7 +78,7 @@ public class NetLoggerUtil { String message) { if (eventLogger == null) { logger.debug("the logger is null, defaulting to network logger"); - eventLogger = netLogger; + eventLogger = networkLogger; } if (featureBeforeLog(eventLogger, type, protocol, topic, message)) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/PropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/PropertyUtils.java index 7f15502e..904f9535 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/PropertyUtils.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/PropertyUtils.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,11 +21,13 @@ package org.onap.policy.common.endpoints.utils; import java.util.Properties; +import lombok.AllArgsConstructor; import org.apache.commons.lang3.StringUtils; /** * Utilities for extracting property values and converting them to other types. */ +@AllArgsConstructor public class PropertyUtils { /** * Properties on which to work. @@ -42,20 +44,6 @@ public class PropertyUtils { */ private TriConsumer<String, String, Exception> invalidHandler; - - /** - * Constructs the object. - * - * @param properties properties on which to work - * @param prefix prefix to prepend to property names - * @param invalidHandler function to invoke if a property value is invalid - */ - public PropertyUtils(Properties properties, String prefix, TriConsumer<String, String, Exception> invalidHandler) { - this.properties = properties; - this.prefix = prefix; - this.invalidHandler = invalidHandler; - } - /** * Gets a string property. * diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/UebPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/UebPropertyUtils.java deleted file mode 100644 index d0217518..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/UebPropertyUtils.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.utils; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; - -public class UebPropertyUtils { - - private UebPropertyUtils() { - // do nothing - } - - /** - * Makes a topic builder, configuring it with properties that are common to both - * sources and sinks. - * - * @param props properties to be used to configure the builder - * @param topic topic being configured - * @param servers target servers - * @return a topic builder - */ - public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) { - - final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - return BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .effectiveTopic(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, - topic)) - .apiKey(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX, null)) - .apiSecret(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX, null)) - .consumerGroup(props.getString( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null)) - .consumerInstance(props.getString( - PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null)) - .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true)) - .useHttps(props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false)) - .allowSelfSignedCerts(props.getBoolean( - PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX, - false)); - } -} |