aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java6
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java44
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java70
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java4
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java3
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java43
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java25
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java105
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java25
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java104
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java196
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java214
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java253
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java95
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java209
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java18
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java4
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java132
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java139
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/AuthorizationFilter.java9
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServer.java14
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java14
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/RestServer.java28
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/aaf/AafAuthFilter.java46
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/aaf/AafGranularAuthFilter.java49
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyServletServer.java19
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyStaticResourceServer.java4
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java8
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java38
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/DmaapPropertyUtils.java122
30 files changed, 47 insertions, 1993 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java
index 69bee717..d09e7353 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java
@@ -4,7 +4,7 @@
* ================================================================================
* Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2019 Samsung Electronics Co., Ltd.
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022,2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -41,10 +41,6 @@ public interface Topic extends TopicRegisterable, Startable, Lockable {
*/
UEB,
/**
- * DMAAP Communication Infrastructure.
- */
- DMAAP,
- /**
* KAFKA Communication Infrastructure.
*/
KAFKA,
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
index 9bf3731a..8cae5bd1 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
@@ -3,7 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022,2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,8 +25,6 @@ import java.util.List;
import java.util.Properties;
import org.onap.policy.common.capabilities.Lockable;
import org.onap.policy.common.capabilities.Startable;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
@@ -37,7 +35,7 @@ import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
import org.onap.policy.common.endpoints.parameters.TopicParameters;
/**
- * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into
+ * Abstraction to manage the system's Networked Topic Endpoints, sources of all events input into
* the System.
*/
public interface TopicEndpoint extends Startable, Lockable {
@@ -143,18 +141,6 @@ public interface TopicEndpoint extends Startable, Lockable {
UebTopicSource getUebTopicSource(String topicName);
/**
- * Get the DMAAP Topic Source for the given topic name.
- *
- * @param topicName the topic name
- *
- * @return the DMAAP Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for example multiple
- * TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- DmaapTopicSource getDmaapTopicSource(String topicName);
-
- /**
* Get the Noop Source for the given topic name.
*
* @param topicName the topic name.
@@ -236,18 +222,6 @@ public interface TopicEndpoint extends Startable, Lockable {
NoopTopicSink getNoopTopicSink(String topicName);
/**
- * Get the DMAAP Topic Source for the given topic name.
- *
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for example multiple
- * TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- DmaapTopicSink getDmaapTopicSink(String topicName);
-
- /**
* Get the KAFKA Topic Source for the given topic name.
*
* @param topicName the topic name
@@ -267,13 +241,6 @@ public interface TopicEndpoint extends Startable, Lockable {
List<UebTopicSource> getUebTopicSources();
/**
- * Gets only the DMAAP Topic Sources.
- *
- * @return the DMAAP Topic Source List
- */
- List<DmaapTopicSource> getDmaapTopicSources();
-
- /**
* Gets only the KAFKA Topic Sources.
*
* @return the KAFKA Topic Source List
@@ -295,13 +262,6 @@ public interface TopicEndpoint extends Startable, Lockable {
List<UebTopicSink> getUebTopicSinks();
/**
- * Gets only the DMAAP Topic Sinks.
- *
- * @return the DMAAP Topic Sink List
- */
- List<DmaapTopicSink> getDmaapTopicSinks();
-
- /**
* Gets only the KAFKA Topic Sinks.
*
* @return the KAFKA Topic Sinks List
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
index 5ba32b28..4ec8eb54 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
@@ -3,7 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2022-2023 Nordix Foundation.
+ * Modifications Copyright (C) 2022-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,9 +28,6 @@ import java.util.Objects;
import java.util.Properties;
import lombok.Getter;
import org.onap.policy.common.capabilities.Startable;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicFactories;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicFactories;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
@@ -96,9 +93,6 @@ class TopicEndpointProxy implements TopicEndpoint {
case UEB:
sources.add(UebTopicFactories.getSourceFactory().build(param));
break;
- case DMAAP:
- sources.add(DmaapTopicFactories.getSourceFactory().build(param));
- break;
case KAFKA:
sources.add(KafkaTopicFactories.getSourceFactory().build(param));
break;
@@ -121,14 +115,12 @@ class TopicEndpointProxy implements TopicEndpoint {
public List<TopicSource> addTopicSources(Properties properties) {
// 1. Create UEB Sources
- // 2. Create DMAAP Sources
- // 3. Create KAFKA Sources
- // 4. Create NOOP Sources
+ // 2. Create KAFKA Sources
+ // 3. Create NOOP Sources
List<TopicSource> sources = new ArrayList<>();
sources.addAll(UebTopicFactories.getSourceFactory().build(properties));
- sources.addAll(DmaapTopicFactories.getSourceFactory().build(properties));
sources.addAll(KafkaTopicFactories.getSourceFactory().build(properties));
sources.addAll(NoopTopicFactories.getSourceFactory().build(properties));
@@ -152,9 +144,6 @@ class TopicEndpointProxy implements TopicEndpoint {
case UEB:
sinks.add(UebTopicFactories.getSinkFactory().build(param));
break;
- case DMAAP:
- sinks.add(DmaapTopicFactories.getSinkFactory().build(param));
- break;
case KAFKA:
sinks.add(KafkaTopicFactories.getSinkFactory().build(param));
break;
@@ -176,14 +165,12 @@ class TopicEndpointProxy implements TopicEndpoint {
@Override
public List<TopicSink> addTopicSinks(Properties properties) {
// 1. Create UEB Sinks
- // 2. Create DMAAP Sinks
- // 3. Create KAFKA Sinks
- // 4. Create NOOP Sinks
+ // 2. Create KAFKA Sinks
+ // 3. Create NOOP Sinks
final List<TopicSink> sinks = new ArrayList<>();
sinks.addAll(UebTopicFactories.getSinkFactory().build(properties));
- sinks.addAll(DmaapTopicFactories.getSinkFactory().build(properties));
sinks.addAll(KafkaTopicFactories.getSinkFactory().build(properties));
sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties));
@@ -204,7 +191,6 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<TopicSource> sources = new ArrayList<>();
sources.addAll(UebTopicFactories.getSourceFactory().inventory());
- sources.addAll(DmaapTopicFactories.getSourceFactory().inventory());
sources.addAll(KafkaTopicFactories.getSourceFactory().inventory());
sources.addAll(NoopTopicFactories.getSourceFactory().inventory());
@@ -228,12 +214,6 @@ class TopicEndpointProxy implements TopicEndpoint {
}
try {
- sources.add(Objects.requireNonNull(this.getDmaapTopicSource(topic)));
- } catch (final Exception e) {
- logger.debug("No DMAAP source for topic: {}", topic, e);
- }
-
- try {
sources.add(Objects.requireNonNull(this.getKafkaTopicSource(topic)));
} catch (final Exception e) {
logger.debug("No KAFKA source for topic: {}", topic, e);
@@ -255,7 +235,6 @@ class TopicEndpointProxy implements TopicEndpoint {
final List<TopicSink> sinks = new ArrayList<>();
sinks.addAll(UebTopicFactories.getSinkFactory().inventory());
- sinks.addAll(DmaapTopicFactories.getSinkFactory().inventory());
sinks.addAll(KafkaTopicFactories.getSinkFactory().inventory());
sinks.addAll(NoopTopicFactories.getSinkFactory().inventory());
@@ -278,12 +257,6 @@ class TopicEndpointProxy implements TopicEndpoint {
}
try {
- sinks.add(Objects.requireNonNull(this.getDmaapTopicSink(topic)));
- } catch (final Exception e) {
- logger.debug("No DMAAP sink for topic: {}", topic, e);
- }
-
- try {
sinks.add(Objects.requireNonNull(this.getKafkaTopicSink(topic)));
} catch (final Exception e) {
logger.debug("No KAFKA sink for topic: {}", topic, e);
@@ -313,12 +286,6 @@ class TopicEndpointProxy implements TopicEndpoint {
}
try {
- sinks.add(this.getDmaapTopicSink(topicName));
- } catch (final Exception e) {
- logNoSink(topicName, e);
- }
-
- try {
sinks.add(this.getKafkaTopicSink(topicName));
} catch (final Exception e) {
logNoSink(topicName, e);
@@ -341,12 +308,6 @@ class TopicEndpointProxy implements TopicEndpoint {
@GsonJsonIgnore
@Override
- public List<DmaapTopicSource> getDmaapTopicSources() {
- return DmaapTopicFactories.getSourceFactory().inventory();
- }
-
- @GsonJsonIgnore
- @Override
public List<KafkaTopicSource> getKafkaTopicSources() {
return KafkaTopicFactories.getSourceFactory().inventory();
}
@@ -363,12 +324,6 @@ class TopicEndpointProxy implements TopicEndpoint {
return UebTopicFactories.getSinkFactory().inventory();
}
- @GsonJsonIgnore
- @Override
- public List<DmaapTopicSink> getDmaapTopicSinks() {
- return DmaapTopicFactories.getSinkFactory().inventory();
- }
-
@Override
@GsonJsonIgnore
public List<KafkaTopicSink> getKafkaTopicSinks() {
@@ -459,9 +414,6 @@ class TopicEndpointProxy implements TopicEndpoint {
UebTopicFactories.getSourceFactory().destroy();
UebTopicFactories.getSinkFactory().destroy();
- DmaapTopicFactories.getSourceFactory().destroy();
- DmaapTopicFactories.getSinkFactory().destroy();
-
KafkaTopicFactories.getSourceFactory().destroy();
KafkaTopicFactories.getSinkFactory().destroy();
@@ -527,7 +479,6 @@ class TopicEndpointProxy implements TopicEndpoint {
return switch (commType) {
case UEB -> this.getUebTopicSource(topicName);
- case DMAAP -> this.getDmaapTopicSource(topicName);
case KAFKA -> this.getKafkaTopicSource(topicName);
case NOOP -> this.getNoopTopicSource(topicName);
default -> throw new UnsupportedOperationException("Unsupported " + commType.name());
@@ -546,7 +497,6 @@ class TopicEndpointProxy implements TopicEndpoint {
return switch (commType) {
case UEB -> this.getUebTopicSink(topicName);
- case DMAAP -> this.getDmaapTopicSink(topicName);
case KAFKA -> this.getKafkaTopicSink(topicName);
case NOOP -> this.getNoopTopicSink(topicName);
default -> throw new UnsupportedOperationException("Unsupported " + commType.name());
@@ -564,11 +514,6 @@ class TopicEndpointProxy implements TopicEndpoint {
}
@Override
- public DmaapTopicSource getDmaapTopicSource(String topicName) {
- return DmaapTopicFactories.getSourceFactory().get(topicName);
- }
-
- @Override
public KafkaTopicSource getKafkaTopicSource(String topicName) {
return KafkaTopicFactories.getSourceFactory().get(topicName);
}
@@ -579,11 +524,6 @@ class TopicEndpointProxy implements TopicEndpoint {
}
@Override
- public DmaapTopicSink getDmaapTopicSink(String topicName) {
- return DmaapTopicFactories.getSinkFactory().get(topicName);
- }
-
- @Override
public KafkaTopicSink getKafkaTopicSink(String topicName) {
return KafkaTopicFactories.getSinkFactory().get(topicName);
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java
index 5ca87732..4073f5a7 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java
@@ -3,7 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2023 Nordix Foundation.
+ * Modifications Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +24,7 @@ package org.onap.policy.common.endpoints.event.comm.bus;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
/**
- * Topic Sink over Bus Infrastructure (DMAAP/UEB).
+ * Topic Sink over Bus Infrastructure (KAFKA/UEB).
*/
public interface BusTopicSink extends ApiKeyEnabled, TopicSink {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java
index cd9bc015..f1af8a21 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,7 +24,7 @@ package org.onap.policy.common.endpoints.event.comm.bus;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
/**
- * Generic Topic Source for UEB/DMAAP Communication Infrastructure.
+ * Generic Topic Source for UEB/KAFKA Communication Infrastructure.
*
*/
public interface BusTopicSource extends ApiKeyEnabled, TopicSource {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java
deleted file mode 100644
index d5a46f8f..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import lombok.Getter;
-
-public class DmaapTopicFactories {
-
- /**
- * Factory for instantiation and management of sinks.
- */
- @Getter
- private static final DmaapTopicSinkFactory sinkFactory = new IndexedDmaapTopicSinkFactory();
-
- /**
- * Factory for instantiation and management of sources.
- */
- @Getter
- private static final DmaapTopicSourceFactory sourceFactory = new IndexedDmaapTopicSourceFactory();
-
-
- private DmaapTopicFactories() {
- // do nothing
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java
deleted file mode 100644
index 805ed108..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-public interface DmaapTopicSink extends BusTopicSink {
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
deleted file mode 100644
index 4409e827..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import java.util.List;
-import java.util.Properties;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-
-/**
- * DMAAP Topic Sink Factory.
- */
-public interface DmaapTopicSinkFactory {
-
- /**
- * <pre>
- * Instantiate a new DMAAP Topic Sink, with following params.
- * servers list of servers
- * topic topic name
- * apiKey API Key
- * apiSecret API Secret
- * userName AAF user name
- * password AAF password
- * partitionKey Consumer Group
- * environment DME2 environment
- * aftEnvironment DME2 AFT environment
- * partner DME2 Partner
- * latitude DME2 latitude
- * longitude DME2 longitude
- * additionalProps additional properties to pass to DME2
- * managed is this sink endpoint managed?
- * </pre>
- * @param busTopicParams parameter object
- * @return DmaapTopicSink object
- * @throws IllegalArgumentException if invalid parameters are present
- */
- DmaapTopicSink build(BusTopicParams busTopicParams);
-
- /**
- * Creates an DMAAP Topic Sink based on properties files.
- *
- * @param properties Properties containing initialization values
- * @return an DMAAP Topic Sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- List<DmaapTopicSink> build(Properties properties);
-
- /**
- * Instantiates a new DMAAP Topic Sink.
- *
- * @param servers list of servers
- * @param topic topic name
- * @return an DMAAP Topic Sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- DmaapTopicSink build(List<String> servers, String topic);
-
- /**
- * Destroys an DMAAP Topic Sink based on a topic.
- *
- * @param topic topic name
- * @throws IllegalArgumentException if invalid parameters are present
- */
- void destroy(String topic);
-
- /**
- * Destroys all DMAAP Topic Sinks.
- */
- void destroy();
-
- /**
- * Gets an DMAAP Topic Sink based on topic name.
- *
- * @param topic the topic name
- * @return an DMAAP Topic Sink with topic name
- * @throws IllegalArgumentException if an invalid topic is provided
- * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state
- */
- DmaapTopicSink get(String topic);
-
- /**
- * Provides a snapshot of the DMAAP Topic Sinks.
- *
- * @return a list of the DMAAP Topic Sinks
- */
- List<DmaapTopicSink> inventory();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java
deleted file mode 100644
index 9893fa15..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-public interface DmaapTopicSource extends BusTopicSource {
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
deleted file mode 100644
index 7b1f185b..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import java.util.List;
-import java.util.Properties;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-
-/**
- * DMAAP Topic Source Factory.
- */
-public interface DmaapTopicSourceFactory {
-
- /**
- * Creates an DMAAP Topic Source based on properties files.
- *
- * @param properties Properties containing initialization values
- *
- * @return an DMAAP Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- List<DmaapTopicSource> build(Properties properties);
-
- /**
- * Instantiates a new DMAAP Topic Source.
- *
- * @param busTopicParams parameters object
- * @return a DMAAP Topic Source
- */
- DmaapTopicSource build(BusTopicParams busTopicParams);
-
- /**
- * Instantiates a new DMAAP Topic Source.
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- *
- * @return an DMAAP Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
-
- /**
- * Instantiates a new DMAAP Topic Source.
- *
- * @param servers list of servers
- * @param topic topic name
- *
- * @return an DMAAP Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- DmaapTopicSource build(List<String> servers, String topic);
-
- /**
- * Destroys an DMAAP Topic Source based on a topic.
- *
- * @param topic topic name
- * @throws IllegalArgumentException if invalid parameters are present
- */
- void destroy(String topic);
-
- /**
- * Destroys all DMAAP Topic Sources.
- */
- void destroy();
-
- /**
- * Gets an DMAAP Topic Source based on topic name.
- *
- * @param topic the topic name
- * @return an DMAAP Topic Source with topic name
- * @throws IllegalArgumentException if an invalid topic is provided
- * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
- */
- DmaapTopicSource get(String topic);
-
- /**
- * Provides a snapshot of the DMAAP Topic Sources.
- *
- * @return a list of the DMAAP Topic Sources
- */
- List<DmaapTopicSource> inventory();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java
deleted file mode 100644
index dfdadd1a..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import com.google.re2j.Pattern;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils;
-import org.onap.policy.common.endpoints.utils.PropertyUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Factory of DMAAP Reader Topics indexed by topic name.
- */
-class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
-
- private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
- private static final String MISSING_TOPIC = "A topic must be provided";
-
- /**
- * Logger.
- */
- private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);
-
- /**
- * DMAAP Topic Name Index.
- */
- protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
-
- @Override
- public DmaapTopicSink build(BusTopicParams busTopicParams) {
-
- if (StringUtils.isBlank(busTopicParams.getTopic())) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) {
- return dmaapTopicWriters.get(busTopicParams.getTopic());
- }
-
- var dmaapTopicSink = makeSink(busTopicParams);
-
- if (busTopicParams.isManaged()) {
- dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink);
- }
- return dmaapTopicSink;
- }
- }
-
- @Override
- public DmaapTopicSink build(List<String> servers, String topic) {
- return this.build(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .managed(true)
- .useHttps(false)
- .allowSelfSignedCerts(false)
- .build());
- }
-
- @Override
- public List<DmaapTopicSink> build(Properties properties) {
-
- String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
- if (StringUtils.isBlank(writeTopics)) {
- logger.info("{}: no topic for DMaaP Sink", this);
- return new ArrayList<>();
- }
-
- List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
- synchronized (this) {
- for (String topic : COMMA_SPACE_PAT.split(writeTopics)) {
- addTopic(newDmaapTopicSinks, properties, topic);
- }
- return newDmaapTopicSinks;
- }
- }
-
- private void addTopic(List<DmaapTopicSink> newDmaapTopicSinks, Properties properties, String topic) {
- if (this.dmaapTopicWriters.containsKey(topic)) {
- newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
- return;
- }
-
- String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic;
-
- var props = new PropertyUtils(properties, topicPrefix,
- (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
-
- String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
- if (StringUtils.isBlank(servers)) {
- logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
- return;
- }
-
- var dmaapTopicSink = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers)
- .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
- .build());
-
- newDmaapTopicSinks.add(dmaapTopicSink);
- }
-
- /**
- * Makes a new sink.
- *
- * @param busTopicParams parameters to use to configure the sink
- * @return a new sink
- */
- protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) {
- return new InlineDmaapTopicSink(busTopicParams);
- }
-
- @Override
- public void destroy(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- DmaapTopicSink dmaapTopicWriter;
- synchronized (this) {
- if (!dmaapTopicWriters.containsKey(topic)) {
- return;
- }
-
- dmaapTopicWriter = dmaapTopicWriters.remove(topic);
- }
-
- dmaapTopicWriter.shutdown();
- }
-
- @Override
- public void destroy() {
- List<DmaapTopicSink> writers = this.inventory();
- for (DmaapTopicSink writer : writers) {
- writer.shutdown();
- }
-
- synchronized (this) {
- this.dmaapTopicWriters.clear();
- }
- }
-
- @Override
- public DmaapTopicSink get(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (dmaapTopicWriters.containsKey(topic)) {
- return dmaapTopicWriters.get(topic);
- } else {
- throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
- }
- }
- }
-
- @Override
- public synchronized List<DmaapTopicSink> inventory() {
- return new ArrayList<>(this.dmaapTopicWriters.values());
- }
-
- @Override
- public String toString() {
- return "IndexedDmaapTopicSinkFactory " + dmaapTopicWriters.keySet();
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java
deleted file mode 100644
index 66960b15..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import com.google.re2j.Pattern;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils;
-import org.onap.policy.common.endpoints.utils.PropertyUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Factory of DMAAP Source Topics indexed by topic name.
- */
-
-class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
- private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
- private static final String MISSING_TOPIC = "A topic must be provided";
-
- /**
- * Logger.
- */
- private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
-
- /**
- * DMaaP Topic Name Index.
- */
- protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
-
- @Override
- public DmaapTopicSource build(BusTopicParams busTopicParams) {
-
- if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) {
- return dmaapTopicSources.get(busTopicParams.getTopic());
- }
-
- var dmaapTopicSource = makeSource(busTopicParams);
-
- if (busTopicParams.isManaged()) {
- dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource);
- }
- return dmaapTopicSource;
- }
- }
-
- @Override
- public List<DmaapTopicSource> build(Properties properties) {
-
- String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
- if (StringUtils.isBlank(readTopics)) {
- logger.info("{}: no topic for DMaaP Source", this);
- return new ArrayList<>();
- }
-
- List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
- synchronized (this) {
- for (String topic : COMMA_SPACE_PAT.split(readTopics)) {
- addTopic(dmaapTopicSourceLst, properties, topic);
- }
- }
- return dmaapTopicSourceLst;
- }
-
- @Override
- public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
- return this.build(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
- .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
- .managed(true)
- .useHttps(false)
- .allowSelfSignedCerts(false)
- .build());
- }
-
- @Override
- public DmaapTopicSource build(List<String> servers, String topic) {
- return this.build(servers, topic, null, null);
- }
-
- private void addTopic(List<DmaapTopicSource> dmaapTopicSourceLst, Properties properties, String topic) {
- if (this.dmaapTopicSources.containsKey(topic)) {
- dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
- return;
- }
-
- String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic;
-
- var props = new PropertyUtils(properties, topicPrefix,
- (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
-
- String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
- if (StringUtils.isBlank(servers)) {
- logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
- return;
- }
-
- DmaapTopicSource uebTopicSource = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers)
- .consumerGroup(props.getString(
- PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
- .consumerInstance(props.getString(
- PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
- .fetchTimeout(props.getInteger(
- PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
- PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
- .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
- PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
- .build());
-
- dmaapTopicSourceLst.add(uebTopicSource);
- }
-
- /**
- * Makes a new source.
- *
- * @param busTopicParams parameters to use to configure the source
- * @return a new source
- */
- protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) {
- return new SingleThreadedDmaapTopicSource(busTopicParams);
- }
-
- @Override
- public void destroy(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- DmaapTopicSource uebTopicSource;
-
- synchronized (this) {
- if (!dmaapTopicSources.containsKey(topic)) {
- return;
- }
-
- uebTopicSource = dmaapTopicSources.remove(topic);
- }
-
- uebTopicSource.shutdown();
- }
-
- @Override
- public void destroy() {
- List<DmaapTopicSource> readers = this.inventory();
- for (DmaapTopicSource reader : readers) {
- reader.shutdown();
- }
-
- synchronized (this) {
- this.dmaapTopicSources.clear();
- }
- }
-
- @Override
- public DmaapTopicSource get(String topic) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (dmaapTopicSources.containsKey(topic)) {
- return dmaapTopicSources.get(topic);
- } else {
- throw new IllegalStateException("DmaapTopiceSource for " + topic + " not found");
- }
- }
- }
-
- @Override
- public synchronized List<DmaapTopicSource> inventory() {
- return new ArrayList<>(this.dmaapTopicSources.values());
- }
-
- @Override
- public String toString() {
- return "IndexedDmaapTopicSourceFactory " + dmaapTopicSources.keySet();
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index d6fa0645..539a78c2 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -55,11 +55,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.jetbrains.annotations.NotNull;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
-import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -377,254 +372,6 @@ public interface BusConsumer {
return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
}
}
-
- /**
- * MR based consumer.
- */
- public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
-
- /**
- * logger.
- */
- private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
-
- /**
- * Name of the "protocol" property.
- */
- protected static final String PROTOCOL_PROP = "Protocol";
-
- /**
- * MR Consumer.
- */
- protected MRConsumerImpl consumer;
-
- /**
- * MR Consumer Wrapper.
- *
- * <p>servers - messaging bus hosts
- * topic - topic
- * apiKey - API Key
- * apiSecret - API Secret
- * username - AAF Login
- * password - AAF Password
- * consumerGroup - Consumer Group
- * consumerInstance - Consumer Instance
- * fetchTimeout - Fetch Timeout
- * fetchLimit - Fetch Limit
- *
- * @param busTopicParams contains above listed attributes
- * @throws MalformedURLException URL should be valid
- */
- protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
- super(busTopicParams);
-
- if (busTopicParams.isTopicInvalid()) {
- throw new IllegalArgumentException("No topic for DMaaP");
- }
-
- this.consumer = new MRConsumerImplBuilder()
- .setHostPart(busTopicParams.getServers())
- .setTopic(busTopicParams.getTopic())
- .setConsumerGroup(busTopicParams.getConsumerGroup())
- .setConsumerId(busTopicParams.getConsumerInstance())
- .setTimeoutMs(busTopicParams.getFetchTimeout())
- .setLimit(busTopicParams.getFetchLimit())
- .setApiKey(busTopicParams.getApiKey())
- .setApiSecret(busTopicParams.getApiSecret())
- .createMRConsumerImpl();
-
- this.consumer.setUsername(busTopicParams.getUserName());
- this.consumer.setPassword(busTopicParams.getPassword());
- }
-
- @Override
- public Iterable<String> fetch() {
- final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
- if (response == null) {
- logger.warn("{}: DMaaP NULL response received", this);
-
- sleepAfterFetchFailure();
- return new ArrayList<>();
- } else {
- logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
- response.getResponseMessage());
-
- if (!"200".equals(response.getResponseCode())) {
-
- logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
- response.getResponseMessage());
-
- sleepAfterFetchFailure();
-
- /* fall through */
- }
- }
-
- if (response.getActualMessages() == null) {
- return new ArrayList<>();
- } else {
- return response.getActualMessages();
- }
- }
-
- @Override
- public void close() {
- super.close();
- this.consumer.close();
- }
-
- @Override
- public String toString() {
- return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
- + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
- + consumer.getUsername() + "]";
- }
- }
-
- /**
- * MR based consumer.
- */
- class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
-
- /**
- * BusTopicParams contain the following parameters.
- * MR Consumer Wrapper.
- *
- * <p>servers messaging bus hosts
- * topic - topic
- * apiKey - API Key
- * apiSecret - API Secret
- * aafLogin - AAF Login
- * aafPassword - AAF Password
- * consumerGroup - Consumer Group
- * consumerInstance - Consumer Instance
- * fetchTimeout - Fetch Timeout
- * fetchLimit - Fetch Limit
- *
- * @param busTopicParams contains above listed params
- * @throws MalformedURLException URL should be valid
- */
- public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
-
- super(busTopicParams);
-
- // super constructor sets servers = {""} if empty to avoid errors when using DME2
- if (busTopicParams.isServersInvalid()) {
- throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
- }
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
-
- Properties props = new Properties();
-
- if (busTopicParams.isUseHttps()) {
- props.setProperty(PROTOCOL_PROP, "https");
- this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
-
- } else {
- props.setProperty(PROTOCOL_PROP, "http");
- this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
- }
-
- this.consumer.setProps(props);
- logger.info("{}: CREATION", this);
- }
-
- @Override
- public String toString() {
- final MRConsumerImpl consumer = this.consumer;
-
- return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
- + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
- + consumer.getUsername() + "]";
- }
- }
-
- class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
-
- /**
- * Constructor.
- *
- * @param busTopicParams topic parameters
- * @throws MalformedURLException must provide a valid URL
- */
- public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
-
-
- super(busTopicParams);
-
-
- final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
- ? busTopicParams.getAdditionalProps().get(
- PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
- : null);
-
- BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
-
- if ((busTopicParams.isPartnerInvalid())
- && StringUtils.isBlank(dme2RouteOffer)) {
- throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
- }
-
- final String serviceName = busTopicParams.getServers().get(0);
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
-
- this.consumer.setUsername(busTopicParams.getUserName());
- this.consumer.setPassword(busTopicParams.getPassword());
-
- Properties props = getProperties(busTopicParams, serviceName, dme2RouteOffer);
-
- MRClientFactory.prop = props;
- this.consumer.setProps(props);
-
- logger.info("{}: CREATION", this);
- }
-
- @NotNull
- private static Properties getProperties(BusTopicParams busTopicParams, String serviceName,
- String dme2RouteOffer) {
- Properties props = new Properties();
-
- props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
-
- props.setProperty("username", busTopicParams.getUserName());
- props.setProperty("password", busTopicParams.getPassword());
-
- /* These are required, no defaults */
- props.setProperty("topic", busTopicParams.getTopic());
-
- BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props);
-
- props.setProperty("MethodType", "GET");
-
- if (busTopicParams.isUseHttps()) {
- props.setProperty(PROTOCOL_PROP, "https");
-
- } else {
- props.setProperty(PROTOCOL_PROP, "http");
- }
-
- props.setProperty("contenttype", "application/json");
-
- if (busTopicParams.isAdditionalPropsValid()) {
- props.putAll(busTopicParams.getAdditionalProps());
- }
- return props;
- }
- }
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java
deleted file mode 100644
index 298607b5..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusHelper.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP POLICY
- * ================================================================================
- * Copyright (C) 2023 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END============================================
- * ===================================================================
- *
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import java.util.Properties;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-
-public class BusHelper {
-
- private BusHelper() {
- /* no constructor */
- }
-
- /**
- * Complete the properties param with common fields for both BusConsumer and BusPublisher.
- * @param busTopicParams topics
- * @param dme2RouteOffer route
- * @param props properties
- */
- public static void setCommonProperties(BusTopicParams busTopicParams, String dme2RouteOffer, Properties props) {
- props.setProperty("Environment", busTopicParams.getEnvironment());
- props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
-
- if (busTopicParams.getPartner() != null) {
- props.setProperty("Partner", busTopicParams.getPartner());
- }
- if (dme2RouteOffer != null) {
- props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- }
-
- props.setProperty("Latitude", busTopicParams.getLatitude());
- props.setProperty("Longitude", busTopicParams.getLongitude());
-
- /* These are optional, will default to these values if not set in additionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
-
- /* These should not change */
- props.setProperty("TransportType", "DME2");
- }
-
- /**
- * Throws exception when any of the checks are invalid.
- * @param busTopicParams topics
- * @param topicType topic type (sink or source)
- */
- public static void validateBusTopicParams(BusTopicParams busTopicParams, String topicType) {
- if (busTopicParams.isEnvironmentInvalid()) {
- throw paramException(busTopicParams.getTopic(), topicType,
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isAftEnvironmentInvalid()) {
- throw paramException(busTopicParams.getTopic(), topicType,
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isLatitudeInvalid()) {
- throw paramException(busTopicParams.getTopic(), topicType,
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (busTopicParams.isLongitudeInvalid()) {
- throw paramException(busTopicParams.getTopic(), topicType,
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
- }
- }
-
- private static IllegalArgumentException paramException(String topic, String topicType, String propertyName) {
- return new IllegalArgumentException("Missing " + topicType + "."
- + topic + propertyName + " property for DME2 in DMaaP");
-
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index def8f841..e2adde0d 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -5,7 +5,7 @@
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
* Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
- * Modifications Copyright (C) 2022-2023 Nordix Foundation.
+ * Modifications Copyright (C) 2022-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,21 +29,12 @@ import com.att.nsa.cambria.client.CambriaClientBuilders;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
-import org.onap.dmaap.mr.client.response.MRPublisherResponse;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -239,202 +230,4 @@ public interface BusPublisher {
}
}
-
- /**
- * DmaapClient library wrapper.
- */
- abstract class DmaapPublisherWrapper implements BusPublisher {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
-
- /**
- * MR based Publisher.
- */
- protected MRSimplerBatchPublisher publisher;
- protected Properties props;
-
- /**
- * MR Publisher Wrapper.
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param username AAF or DME2 Login
- * @param password AAF or DME2 Password
- */
- protected DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
- String username, String password, boolean useHttps) {
-
- if (StringUtils.isBlank(topic)) {
- throw new IllegalArgumentException("No topic for DMaaP");
- }
-
- configureProtocol(topic, protocol, servers, useHttps);
-
- this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
-
- this.publisher.setUsername(username);
- this.publisher.setPassword(password);
-
- props = new Properties();
-
- props.setProperty("Protocol", (useHttps ? "https" : "http"));
- props.setProperty("contenttype", "application/json");
- props.setProperty("username", username);
- props.setProperty("password", password);
-
- props.setProperty("topic", topic);
-
- this.publisher.setProps(props);
-
- if (protocol == ProtocolTypeConstants.AAF_AUTH) {
- this.publisher.setHost(servers.get(0));
- }
-
- logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
- }
-
- private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
- boolean useHttps) {
-
- if (protocol == ProtocolTypeConstants.AAF_AUTH) {
- if (servers == null || servers.isEmpty()) {
- throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
- }
-
- ArrayList<String> dmaapServers = new ArrayList<>();
- String port = useHttps ? ":3905" : ":3904";
- for (String server : servers) {
- dmaapServers.add(server + port);
- }
-
- this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
-
- this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
-
- } else if (protocol == ProtocolTypeConstants.DME2) {
- ArrayList<String> dmaapServers = new ArrayList<>();
- dmaapServers.add("0.0.0.0:3904");
-
- this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
-
- this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
-
- } else {
- throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
- }
- }
-
- @Override
- public void close() {
- logger.info(LOG_CLOSE, this);
-
- try {
- this.publisher.close(1, TimeUnit.SECONDS);
-
- } catch (InterruptedException e) {
- logger.warn(LOG_CLOSE_FAILED, this, e);
- Thread.currentThread().interrupt();
-
- } catch (Exception e) {
- logger.warn(LOG_CLOSE_FAILED, this, e);
- }
- }
-
- @Override
- public boolean send(String partitionId, String message) {
- if (message == null) {
- throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
- }
-
- this.publisher.setPubResponse(new MRPublisherResponse());
- this.publisher.send(partitionId, message);
- MRPublisherResponse response = this.publisher.sendBatchWithResponse();
- if (response != null) {
- logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
- response.getResponseMessage());
- }
-
- return true;
- }
-
- @Override
- public String toString() {
- return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
- + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
- + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
- + ", publisher.getUsername()=" + publisher.getUsername() + "]";
- }
- }
-
- /**
- * DmaapClient library wrapper.
- */
- class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
- /**
- * MR based Publisher.
- */
- public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
- boolean useHttps) {
-
- super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
- }
- }
-
- class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
-
- /**
- * Constructor.
- *
- * @param busTopicParams topic parameters
- */
- public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
-
- super(ProtocolTypeConstants.DME2, busTopicParams.getServers(), busTopicParams.getTopic(),
- busTopicParams.getUserName(), busTopicParams.getPassword(), busTopicParams.isUseHttps());
-
- String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
- ? busTopicParams.getAdditionalProps().get(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
- : null;
-
- validateParams(busTopicParams, dme2RouteOffer);
-
- String serviceName = busTopicParams.getServers().get(0);
-
- /* These are required, no defaults */
- props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
-
- BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props);
-
- props.setProperty("MethodType", "POST");
-
- if (busTopicParams.isAdditionalPropsValid()) {
- addAdditionalProps(busTopicParams);
- }
-
- this.publisher.setProps(props);
- }
-
- private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
- BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
-
- if ((busTopicParams.isPartnerInvalid()) && StringUtils.isBlank(dme2RouteOffer)) {
- throw new IllegalArgumentException("Must provide at least "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
- }
- }
-
- private void addAdditionalProps(BusTopicParams busTopicParams) {
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
-
- if (value != null) {
- props.setProperty(key, value);
- }
- }
- }
- }
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
index 7cc8f8b6..53a6ab66 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
@@ -4,7 +4,7 @@
* ================================================================================
* Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved.
* Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019, 2023 Nordix Foundation.
+ * Modifications Copyright (C) 2019, 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,14 +33,14 @@ import org.apache.commons.lang3.StringUtils;
/**
* Member variables of this Params class are as follows.
*
- * <p>servers DMaaP servers
- * topic DMaaP Topic to be monitored
- * apiKey DMaaP API Key (optional)
- * apiSecret DMaaP API Secret (optional)
- * consumerGroup DMaaP Reader Consumer Group
- * consumerInstance DMaaP Reader Instance
- * fetchTimeout DMaaP fetch timeout
- * fetchLimit DMaaP fetch limit
+ * <p>servers Kafka servers
+ * topic Kafka Topic to be monitored
+ * apiKey Kafka API Key (optional)
+ * apiSecret Kafka API Secret (optional)
+ * consumerGroup kafka Reader Consumer Group
+ * consumerInstance Kafka Reader Instance
+ * fetchTimeout kafka fetch timeout
+ * fetchLimit Kafka fetch limit
* environment DME2 Environment
* aftEnvironment DME2 AFT Environment
* partner DME2 Partner
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
index 7c740abf..dfdc7b3c 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
@@ -5,7 +5,7 @@
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
* Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
- * Modifications Copyright (C) 2023 Nordix Foundation.
+ * Modifications Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
/**
* Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink
- * regardless if it is UEB or DMaaP.
+ * regardless if it is UEB or Kafka.
*
*/
public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java
deleted file mode 100644
index 771efb33..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
- * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import java.util.Map;
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This implementation publishes events for the associated DMAAP topic, inline with the calling
- * thread.
- */
-public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink {
-
- protected static Logger logger = LoggerFactory.getLogger(InlineDmaapTopicSink.class);
-
- protected final String userName;
- protected final String password;
-
- protected String environment = null;
- protected String aftEnvironment = null;
- protected String partner = null;
- protected String latitude = null;
- protected String longitude = null;
-
- protected Map<String, String> additionalProps = null;
-
- /**
- * BusTopicParams contains the below mentioned attributes.
- * servers DMaaP servers
- * topic DMaaP Topic to be monitored
- * apiKey DMaaP API Key (optional)
- * apiSecret DMaaP API Secret (optional)
- * environment DME2 Environment
- * aftEnvironment DME2 AFT Environment
- * partner DME2 Partner
- * latitude DME2 Latitude
- * longitude DME2 Longitude
- * additionalProps Additional properties to pass to DME2
- * useHttps does connection use HTTPS?
- * allowTracing is tracing allowed?
- * allowSelfSignedCerts are self-signed certificates allow
- * @param busTopicParams Contains the above mentioned parameters
- * @throws IllegalArgumentException An invalid parameter passed in
- */
- public InlineDmaapTopicSink(BusTopicParams busTopicParams) {
-
- super(busTopicParams);
-
- this.userName = busTopicParams.getUserName();
- this.password = busTopicParams.getPassword();
-
- this.environment = busTopicParams.getEnvironment();
- this.aftEnvironment = busTopicParams.getAftEnvironment();
- this.partner = busTopicParams.getPartner();
-
- this.latitude = busTopicParams.getLatitude();
- this.longitude = busTopicParams.getLongitude();
-
- this.additionalProps = busTopicParams.getAdditionalProps();
- }
-
-
- @Override
- public void init() {
- if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
- this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.effectiveTopic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
- .userName(this.userName)
- .password(this.password)
- .useHttps(this.useHttps)
- .allowTracing(this.allowTracing)
- .allowSelfSignedCerts(this.allowSelfSignedCerts)
- .build());
- } else {
- this.publisher = new BusPublisher.DmaapDmePublisherWrapper(BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.effectiveTopic)
- .userName(this.userName)
- .password(this.password)
- .environment(this.environment)
- .aftEnvironment(this.aftEnvironment)
- .partner(this.partner)
- .latitude(this.latitude)
- .longitude(this.longitude)
- .additionalProps(this.additionalProps)
- .useHttps(this.useHttps)
- .allowTracing(this.allowTracing)
- .build());
- }
-
- logger.info("{}: DMAAP SINK created", this);
- }
-
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.DMAAP;
- }
-
-
- @Override
- public String toString() {
- return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password
- + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + super.toString()
- + "]";
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
deleted file mode 100644
index 26960379..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import java.net.MalformedURLException;
-import java.util.Map;
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This topic reader implementation specializes in reading messages over DMAAP topic and notifying
- * its listeners.
- */
-public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable {
-
- private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
-
-
- protected final String userName;
- protected final String password;
-
- protected String environment = null;
- protected String aftEnvironment = null;
- protected String partner = null;
- protected String latitude = null;
- protected String longitude = null;
-
- protected Map<String, String> additionalProps = null;
-
-
- /**
- * Constructor.
- *
- * @param busTopicParams Parameters object containing all the required inputs
- *
- * @throws IllegalArgumentException An invalid parameter passed in
- */
- public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) {
-
- super(busTopicParams);
-
- this.userName = busTopicParams.getUserName();
- this.password = busTopicParams.getPassword();
-
- this.environment = busTopicParams.getEnvironment();
- this.aftEnvironment = busTopicParams.getAftEnvironment();
- this.partner = busTopicParams.getPartner();
-
- this.latitude = busTopicParams.getLatitude();
- this.longitude = busTopicParams.getLongitude();
-
- this.additionalProps = busTopicParams.getAdditionalProps();
- try {
- this.init();
- } catch (Exception e) {
- throw new IllegalArgumentException("ERROR during init in dmaap-source: cannot create topic " + topic, e);
- }
- }
-
-
- /**
- * Initialize the Cambria or MR Client.
- */
- @Override
- public void init() throws MalformedURLException {
- BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder()
- .servers(this.servers)
- .topic(this.effectiveTopic)
- .apiKey(this.apiKey)
- .apiSecret(this.apiSecret)
- .consumerGroup(this.consumerGroup)
- .consumerInstance(this.consumerInstance)
- .fetchTimeout(this.fetchTimeout)
- .fetchLimit(this.fetchLimit)
- .useHttps(this.useHttps)
- .allowTracing(this.allowTracing);
-
- if (anyNullOrEmpty(this.userName, this.password)) {
- this.consumer = new BusConsumer.CambriaConsumerWrapper(builder
- .allowSelfSignedCerts(this.allowSelfSignedCerts)
- .build());
- } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
- this.consumer = new BusConsumer.CambriaConsumerWrapper(builder
- .userName(this.userName)
- .password(this.password)
- .allowSelfSignedCerts(this.allowSelfSignedCerts)
- .build());
- } else {
- this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(builder
- .userName(this.userName)
- .password(this.password)
- .environment(this.environment)
- .aftEnvironment(this.aftEnvironment)
- .partner(this.partner)
- .latitude(this.latitude)
- .longitude(this.longitude)
- .additionalProps(this.additionalProps)
- .build());
- }
-
- logger.info("{}: INITTED", this);
- }
-
- @Override
- public CommInfrastructure getTopicCommInfrastructure() {
- return Topic.CommInfrastructure.DMAAP;
- }
-
- @Override
- public String toString() {
- return "SingleThreadedDmaapTopicSource [userName=" + userName
- + ", password=" + (password == null || password.isEmpty() ? "-" : password.length())
- + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure()
- + ", toString()=" + super.toString() + "]";
- }
-
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/AuthorizationFilter.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/AuthorizationFilter.java
index b58cde7c..44204cfd 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/AuthorizationFilter.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/AuthorizationFilter.java
@@ -3,7 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2023 Nordix Foundation.
+ * Modifications Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -40,17 +40,14 @@ public abstract class AuthorizationFilter implements Filter {
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
throws IOException, ServletException {
- if (!(servletRequest instanceof HttpServletRequest)) {
+ if (!(servletRequest instanceof HttpServletRequest request)) {
throw new ServletException("Not an HttpServletRequest instance");
}
- if (!(servletResponse instanceof HttpServletResponse)) {
+ if (!(servletResponse instanceof HttpServletResponse response)) {
throw new ServletException("Not an HttpServletResponse instance");
}
- HttpServletRequest request = (HttpServletRequest) servletRequest;
- HttpServletResponse response = (HttpServletResponse) servletResponse;
-
String role = getRole(request);
boolean authorized = request.isUserInRole(role);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServer.java
index 23c2b54a..a20c125d 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServer.java
@@ -3,7 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2020 Nordix Foundation.
+ * Modifications Copyright (C) 2020, 2024 Nordix Foundation.
* Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -52,18 +52,6 @@ public interface HttpServletServer extends Startable {
void setBasicAuthentication(String user, String password, String relativeUriPath);
/**
- * Enables AAF based authentication.
- *
- * @param filterPath filter path
- */
- void setAafAuthentication(String filterPath);
-
- /**
- * Checks if AAF authentication has been enabled.
- */
- boolean isAaf();
-
- /**
* Sets the serialization provider to be used when classes are added to the service.
*
* @param provider the provider to use for message serialization and de-serialization
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java
index 2f557946..7c9aca4c 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java
@@ -3,7 +3,7 @@
* ONAP Policy Engine - Common Modules
* ================================================================================
* Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2020,2023 Nordix Foundation.
+ * Modifications Copyright (C) 2020,2023-2024 Nordix Foundation.
* Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -133,7 +133,7 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory {
// configure the service
setSerializationProvider(props, service);
- setAuthentication(props, service, contextUriPath);
+ setAuthentication(props, service);
final var restUriPath = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX, null);
@@ -156,17 +156,13 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory {
}
}
- private void setAuthentication(PropertyUtils props, HttpServletServer service, final String contextUriPath) {
- /* authentication method either AAF or HTTP Basic Auth */
-
- final var aaf = props.getBoolean(PolicyEndPointProperties.PROPERTY_AAF_SUFFIX, false);
+ private void setAuthentication(PropertyUtils props, HttpServletServer service) {
+ /* authentication method HTTP Basic Auth */
final var userName = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, null);
final var password = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, null);
final var authUriPath = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX, null);
- if (aaf) {
- service.setAafAuthentication(contextUriPath);
- } else if (!StringUtils.isBlank(userName) && !StringUtils.isBlank(password)) {
+ if (!StringUtils.isBlank(userName) && !StringUtils.isBlank(password)) {
service.setBasicAuthentication(userName, password, authUriPath);
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/RestServer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/RestServer.java
index e7924771..7e6ce866 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/RestServer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/RestServer.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019,2023 Nordix Foundation.
+ * Copyright (C) 2019, 2023-2024 Nordix Foundation.
* Modifications Copyright (C) 2019-2021 AT&T Intellectual Property.
* Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
* ================================================================================
@@ -29,7 +29,6 @@ import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import lombok.ToString;
-import org.onap.policy.common.endpoints.http.server.aaf.AafAuthFilter;
import org.onap.policy.common.endpoints.parameters.RestServerParameters;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.gson.GsonMessageBodyHandler;
@@ -54,21 +53,11 @@ public class RestServer extends ServiceManagerContainer {
* Constructs the object.
*
* @param restServerParameters the rest server parameters
- * @param aafFilter class of object to use to filter AAF requests, or {@code null}
* @param jaxrsProviders classes providing the services
*/
- public RestServer(final RestServerParameters restServerParameters, Class<? extends AafAuthFilter> aafFilter,
+ public RestServer(final RestServerParameters restServerParameters,
Class<?>... jaxrsProviders) {
-
- this(restServerParameters, makeFilterList(aafFilter), Arrays.asList(jaxrsProviders));
- }
-
- private static List<Class<? extends Filter>> makeFilterList(Class<? extends AafAuthFilter> aafFilter) {
- if (aafFilter == null) {
- return List.of();
- } else {
- return List.of(aafFilter);
- }
+ this(restServerParameters, null, Arrays.asList(jaxrsProviders));
}
/**
@@ -81,7 +70,7 @@ public class RestServer extends ServiceManagerContainer {
public RestServer(final RestServerParameters restServerParameters, List<Class<? extends Filter>> filters,
List<Class<?>> jaxrsProviders) {
- if (jaxrsProviders.isEmpty()) {
+ if (jaxrsProviders == null || jaxrsProviders.isEmpty()) {
throw new IllegalArgumentException("no providers specified");
}
@@ -89,12 +78,9 @@ public class RestServer extends ServiceManagerContainer {
.build(getServerProperties(restServerParameters, getProviderClassNames(jaxrsProviders)));
for (HttpServletServer server : this.servers) {
- for (Class<? extends Filter> filter : filters) {
- if (!AafAuthFilter.class.isAssignableFrom(filter) || server.isAaf()) {
- server.addFilterClass(null, filter.getName());
- }
+ if (filters != null && !filters.isEmpty()) {
+ filters.forEach(filter -> server.addFilterClass(null, filter.getName()));
}
-
addAction("REST " + server.getName(), server::start, server::stop);
}
}
@@ -128,8 +114,6 @@ public class RestServer extends ServiceManagerContainer {
String.valueOf(restServerParameters.isHttps()));
props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SNI_HOST_CHECK_SUFFIX,
String.valueOf(restServerParameters.isSniHostCHeck()));
- props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_AAF_SUFFIX,
- String.valueOf(restServerParameters.isAaf()));
props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER,
String.join(",", GsonMessageBodyHandler.class.getName(), YamlMessageBodyHandler.class.getName(),
JsonExceptionMapper.class.getName(), YamlExceptionMapper.class.getName()));
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/aaf/AafAuthFilter.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/aaf/AafAuthFilter.java
deleted file mode 100644
index 084d2fb9..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/aaf/AafAuthFilter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2023 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.http.server.aaf;
-
-import jakarta.servlet.http.HttpServletRequest;
-import org.onap.policy.common.endpoints.http.server.AuthorizationFilter;
-
-/**
- * Generic Authorization AAF Filter Skeleton. This class will return
- * a permission in AAF format. Subclasses are responsible to provide
- * the AAF permission type and instance.
- */
-public abstract class AafAuthFilter extends AuthorizationFilter {
-
- public static final String DEFAULT_NAMESPACE = "org.onap.policy";
-
- @Override
- protected String getRole(HttpServletRequest request) {
- return
- String.format("%s|%s|%s", getPermissionType(request), getPermissionInstance(request),
- request.getMethod().toLowerCase());
- }
-
- protected abstract String getPermissionType(HttpServletRequest request);
-
- protected abstract String getPermissionInstance(HttpServletRequest request);
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/aaf/AafGranularAuthFilter.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/aaf/AafGranularAuthFilter.java
deleted file mode 100644
index 39524e87..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/aaf/AafGranularAuthFilter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2023 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.http.server.aaf;
-
-import jakarta.servlet.http.HttpServletRequest;
-import org.onap.policy.common.utils.network.NetworkUtil;
-
-/**
- * This generic class allows the mapping of REST APIs to AAF permissions
- * to be evaluated in an AAF context. This class can be used for
- * highly granular permissions where each REST resource can be directly
- * mapped transparently to an AAF permission type, the instance being the host
- * server, and the HTTP method corresponding to the action.
- * Subclasses are responsible to provide the root permission prefix, typically
- * the namespace.
- */
-public abstract class AafGranularAuthFilter extends AafAuthFilter {
-
- @Override
- protected String getPermissionType(HttpServletRequest request) {
- return getPermissionTypeRoot() + request.getRequestURI().replace('/', '.');
- }
-
- @Override
- protected String getPermissionInstance(HttpServletRequest request) {
- return NetworkUtil.getHostname();
- }
-
- public abstract String getPermissionTypeRoot();
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyServletServer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyServletServer.java
index 4e1eda9f..78858a77 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyServletServer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyServletServer.java
@@ -3,7 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019-2020,2023 Nordix Foundation.
+ * Modifications Copyright (C) 2019-2020, 2023-2024 Nordix Foundation.
* Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -43,13 +43,11 @@ import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.Slf4jRequestLogWriter;
-import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.security.Constraint;
import org.eclipse.jetty.util.security.Credential;
import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.onap.aaf.cadi.filter.CadiFilter;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -284,21 +282,6 @@ public abstract class JettyServletServer implements HttpServletServer, Runnable
}
@Override
- public void setAafAuthentication(String filterPath) {
- this.addFilterClass(filterPath, CadiFilter.class.getName());
- }
-
- @Override
- public boolean isAaf() {
- for (FilterHolder filter : context.getServletHandler().getFilters()) {
- if (CadiFilter.class.getName().equals(filter.getClassName())) {
- return true;
- }
- }
- return false;
- }
-
- @Override
public void setBasicAuthentication(String user, String password, String servletPath) {
String srvltPath = servletPath;
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyStaticResourceServer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyStaticResourceServer.java
index 70ac1417..ee2b0540 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyStaticResourceServer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyStaticResourceServer.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2020,2023 Nordix Foundation.
+ * Copyright (C) 2020, 2023-2024 Nordix Foundation.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -29,7 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Jetty Server that uses DefaultServlets to support web static resources management.
+ * Jetty Server that uses DefaultServlets to support web static resources' management.
*/
@ToString
public class JettyStaticResourceServer extends JettyServletServer {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java
index e8a3dc20..d63134bc 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/parameters/TopicParameterGroup.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2019, 2024 Nordix Foundation.
* Modifications Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -58,15 +58,15 @@ public class TopicParameterGroup extends ParameterGroupImpl {
if (result.isValid()) {
var errorMsg = new StringBuilder();
StringBuilder missingSourceParams = checkMissingMandatoryParams(topicSources);
- if (missingSourceParams.length() > 0) {
+ if (!missingSourceParams.isEmpty()) {
errorMsg.append(missingSourceParams.append("missing in topicSources. "));
}
StringBuilder missingSinkParams = checkMissingMandatoryParams(topicSinks);
- if (missingSinkParams.length() > 0) {
+ if (!missingSinkParams.isEmpty()) {
errorMsg.append(missingSinkParams.append("missing in topicSinks."));
}
- if (errorMsg.length() > 0) {
+ if (!errorMsg.isEmpty()) {
errorMsg.insert(0, "Mandatory parameters are missing. ");
result.setResult(ValidationStatus.INVALID, errorMsg.toString());
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
index b7f854af..4b4bf502 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
@@ -3,7 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2022,2023 Nordix Foundation.
+ * Copyright (C) 2022,2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,8 +33,6 @@ public final class PolicyEndPointProperties {
public static final String PROPERTY_TOPIC_TOPICS_SUFFIX = ".topics";
public static final String PROPERTY_TOPIC_API_KEY_SUFFIX = ".apiKey";
public static final String PROPERTY_TOPIC_API_SECRET_SUFFIX = ".apiSecret";
- public static final String PROPERTY_TOPIC_AAF_MECHID_SUFFIX = ".aafMechId";
- public static final String PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX = ".aafPassword"; //NOSONAR
public static final String PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX = ".effectiveTopic";
public static final String PROPERTY_TOPIC_EVENTS_SUFFIX = ".events";
public static final String PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX = ".filter";
@@ -45,7 +43,6 @@ public final class PolicyEndPointProperties {
public static final String PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX = ".fetchTimeout";
public static final String PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX = ".fetchLimit";
public static final String PROPERTY_MANAGED_SUFFIX = ".managed";
- public static final String PROPERTY_AAF_SUFFIX = ".aaf";
public static final String PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX = ".partitionKey";
@@ -56,27 +53,6 @@ public final class PolicyEndPointProperties {
public static final String PROPERTY_UEB_SOURCE_TOPICS = "ueb.source.topics";
public static final String PROPERTY_UEB_SINK_TOPICS = "ueb.sink.topics";
- /* DMAAP Properties */
-
- public static final String PROPERTY_DMAAP_SOURCE_TOPICS = "dmaap.source.topics";
- public static final String PROPERTY_DMAAP_SINK_TOPICS = "dmaap.sink.topics";
-
- public static final String PROPERTY_DMAAP_DME2_PARTNER_SUFFIX = ".dme2.partner";
- public static final String PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX = ".dme2.routeOffer";
- public static final String PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX = ".dme2.environment";
- public static final String PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX = ".dme2.aft.environment";
- public static final String PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX = ".dme2.latitude";
- public static final String PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX = ".dme2.longitude";
-
- public static final String PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX = ".dme2.epReadTimeoutMs";
- public static final String PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX = ".dme2.epConnTimeout";
- public static final String PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX = ".dme2.roundtripTimeoutMs";
- public static final String PROPERTY_DMAAP_DME2_VERSION_SUFFIX = ".dme2.version";
- public static final String PROPERTY_DMAAP_DME2_SERVICE_NAME_SUFFIX = ".dme2.serviceName";
- public static final String PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX = ".dme2.subContextPath";
- public static final String PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX =
- ".dme2.sessionStickinessRequired";
-
public static final String PROPERTY_NOOP_SOURCE_TOPICS = "noop.source.topics";
public static final String PROPERTY_NOOP_SINK_TOPICS = "noop.sink.topics";
@@ -118,18 +94,6 @@ public final class PolicyEndPointProperties {
public static final String PROPERTY_HTTP_URL_SUFFIX = PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX;
-
- /* DMaaP DME2 Topic Properties */
-
- public static final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
- public static final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
- public static final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
- public static final String DME2_VERSION_PROPERTY = "Version";
- public static final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
- public static final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
- public static final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
- public static final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
-
/* Topic Sink Values */
/**
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/DmaapPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/DmaapPropertyUtils.java
deleted file mode 100644
index 3b35197a..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/DmaapPropertyUtils.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.utils;
-
-import com.google.re2j.Pattern;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class DmaapPropertyUtils {
- private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
-
- /**
- * Maps a topic property to a DME property.
- */
- private static final Map<String, String> PROP_TO_DME;
-
- static {
- Map<String, String> map = new HashMap<>();
-
- map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX,
- PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY);
-
- map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX,
- PolicyEndPointProperties.DME2_READ_TIMEOUT_PROPERTY);
-
- map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX,
- PolicyEndPointProperties.DME2_EP_CONN_TIMEOUT_PROPERTY);
-
- map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX,
- PolicyEndPointProperties.DME2_ROUNDTRIP_TIMEOUT_PROPERTY);
-
- map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX,
- PolicyEndPointProperties.DME2_VERSION_PROPERTY);
-
- map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX,
- PolicyEndPointProperties.DME2_SUBCONTEXT_PATH_PROPERTY);
-
- map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX,
- PolicyEndPointProperties.DME2_SESSION_STICKINESS_REQUIRED_PROPERTY);
-
- PROP_TO_DME = Collections.unmodifiableMap(map);
- }
-
- /**
- * Makes a topic builder, configuring it with properties that are common to both
- * sources and sinks.
- *
- * @param props properties to be used to configure the builder
- * @param topic topic being configured
- * @param servers target servers
- * @return a topic builder
- */
- public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) {
-
- /* Additional DME2 Properties */
-
- Map<String, String> dme2AdditionalProps = new HashMap<>();
-
- for (Map.Entry<String, String> ent : PROP_TO_DME.entrySet()) {
- String propName = ent.getKey();
- var value = props.getString(propName, null);
-
- if (!StringUtils.isBlank(value)) {
- String dmeName = ent.getValue();
- dme2AdditionalProps.put(dmeName, value);
- }
- }
-
- final List<String> serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers)));
-
- return BusTopicParams.builder()
- .servers(serverList)
- .topic(topic)
- .effectiveTopic(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX,
- topic))
- .apiKey(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX, null))
- .apiSecret(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX, null))
- .userName(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX, null))
- .password(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX, null))
- .environment(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX,
- null))
- .aftEnvironment(props.getString(
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX, null))
- .partner(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX, null))
- .latitude(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX, null))
- .longitude(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX, null))
- .additionalProps(dme2AdditionalProps)
- .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true))
- .useHttps(props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false))
- .allowSelfSignedCerts(props.getBoolean(
- PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX, false));
- }
-}