aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java14
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java34
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java43
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java11
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java9
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java10
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java32
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java8
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java8
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactories.java43
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java5
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java5
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java43
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java10
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java10
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java43
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java10
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java9
18 files changed, 212 insertions, 135 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java
index ed796585..e77beea1 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,21 +26,17 @@ import org.onap.policy.common.endpoints.event.comm.TopicSink;
* Topic Sink over Bus Infrastructure (DMAAP/UEB).
*/
public interface BusTopicSink extends ApiKeyEnabled, TopicSink {
- /**
- * Log Failures after X number of retries.
- */
- public static final int DEFAULT_LOG_SEND_FAILURES_AFTER = 1;
/**
* Sets the UEB partition key for published messages.
- *
+ *
* @param partitionKey the partition key
*/
public void setPartitionKey(String partitionKey);
/**
* Return the partition key in used by the system to publish messages.
- *
+ *
* @return the partition key
*/
public String getPartitionKey();
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java
index 5ab4d46f..cd9bc015 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -29,49 +29,29 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource;
public interface BusTopicSource extends ApiKeyEnabled, TopicSource {
/**
- * Default Timeout fetching in milliseconds.
- */
- public static int DEFAULT_TIMEOUT_MS_FETCH = 15000;
-
- /**
- * Default maximum number of messages fetch at the time.
- */
- public static int DEFAULT_LIMIT_FETCH = 100;
-
- /**
- * Definition of No Timeout fetching.
- */
- public static int NO_TIMEOUT_MS_FETCH = -1;
-
- /**
- * Definition of No limit fetching.
- */
- public static int NO_LIMIT_FETCH = -1;
-
- /**
* Gets the consumer group.
- *
+ *
* @return consumer group
*/
public String getConsumerGroup();
/**
* Gets the consumer instance.
- *
+ *
* @return consumer instance
*/
public String getConsumerInstance();
/**
* Gets the fetch timeout.
- *
+ *
* @return fetch timeout
*/
public int getFetchTimeout();
/**
* Gets the fetch limit.
- *
+ *
* @return fetch limit
*/
public int getFetchLimit();
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java
new file mode 100644
index 00000000..d5a46f8f
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java
@@ -0,0 +1,43 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import lombok.Getter;
+
+public class DmaapTopicFactories {
+
+ /**
+ * Factory for instantiation and management of sinks.
+ */
+ @Getter
+ private static final DmaapTopicSinkFactory sinkFactory = new IndexedDmaapTopicSinkFactory();
+
+ /**
+ * Factory for instantiation and management of sources.
+ */
+ @Getter
+ private static final DmaapTopicSourceFactory sourceFactory = new IndexedDmaapTopicSourceFactory();
+
+
+ private DmaapTopicFactories() {
+ // do nothing
+ }
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java
index fc1587e4..805ed108 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,9 +22,4 @@ package org.onap.policy.common.endpoints.event.comm.bus;
public interface DmaapTopicSink extends BusTopicSink {
- /**
- * Factory of UebTopicWriter for instantiation and management purposes.
- */
-
- public static final DmaapTopicSinkFactory factory = new IndexedDmaapTopicSinkFactory();
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
index a3eb4df6..4409e827 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
@@ -30,15 +30,6 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
*/
public interface DmaapTopicSinkFactory {
- String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
- String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
- String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
- String DME2_VERSION_PROPERTY = "Version";
- String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
- String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
- String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
- String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
-
/**
* <pre>
* Instantiate a new DMAAP Topic Sink, with following params.
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java
index 2cf07bda..9893fa15 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,8 +22,4 @@ package org.onap.policy.common.endpoints.event.comm.bus;
public interface DmaapTopicSource extends BusTopicSource {
- /**
- * factory for managing and tracking DMAAP sources.
- */
- public static DmaapTopicSourceFactory factory = new IndexedDmaapTopicSourceFactory();
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
index 35a79bf1..7b1f185b 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
@@ -8,9 +8,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -29,20 +29,12 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
* DMAAP Topic Source Factory.
*/
public interface DmaapTopicSourceFactory {
- String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
- String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
- String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
- String DME2_VERSION_PROPERTY = "Version";
- String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
- String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
- String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
- String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
/**
* Creates an DMAAP Topic Source based on properties files.
- *
+ *
* @param properties Properties containing initialization values
- *
+ *
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
@@ -50,7 +42,7 @@ public interface DmaapTopicSourceFactory {
/**
* Instantiates a new DMAAP Topic Source.
- *
+ *
* @param busTopicParams parameters object
* @return a DMAAP Topic Source
*/
@@ -58,12 +50,12 @@ public interface DmaapTopicSourceFactory {
/**
* Instantiates a new DMAAP Topic Source.
- *
+ *
* @param servers list of servers
* @param topic topic name
* @param apiKey API Key
* @param apiSecret API Secret
- *
+ *
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
@@ -71,10 +63,10 @@ public interface DmaapTopicSourceFactory {
/**
* Instantiates a new DMAAP Topic Source.
- *
+ *
* @param servers list of servers
* @param topic topic name
- *
+ *
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
@@ -82,7 +74,7 @@ public interface DmaapTopicSourceFactory {
/**
* Destroys an DMAAP Topic Source based on a topic.
- *
+ *
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
*/
@@ -95,7 +87,7 @@ public interface DmaapTopicSourceFactory {
/**
* Gets an DMAAP Topic Source based on topic name.
- *
+ *
* @param topic the topic name
* @return an DMAAP Topic Source with topic name
* @throws IllegalArgumentException if an invalid topic is provided
@@ -105,7 +97,7 @@ public interface DmaapTopicSourceFactory {
/**
* Provides a snapshot of the DMAAP Topic Sources.
- *
+ *
* @return a list of the DMAAP Topic Sources
*/
List<DmaapTopicSource> inventory();
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java
index c895a409..d7f4695e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java
@@ -96,8 +96,8 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
.topic(topic)
.apiKey(apiKey)
.apiSecret(apiSecret)
- .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
- .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH)
+ .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
+ .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
.managed(true)
.useHttps(false)
.allowSelfSignedCerts(false)
@@ -133,9 +133,9 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
.fetchTimeout(props.getInteger(
PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
- DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH))
+ PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
.fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
- DmaapTopicSource.DEFAULT_LIMIT_FETCH))
+ PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
.build());
dmaapTopicSourceLst.add(uebTopicSource);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java
index 6655aa12..5bdc8ab6 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java
@@ -100,8 +100,8 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
.topic(topic)
.apiKey(apiKey)
.apiSecret(apiSecret)
- .fetchTimeout(UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
- .fetchLimit(UebTopicSource.DEFAULT_LIMIT_FETCH)
+ .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
+ .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
.managed(true)
.useHttps(false)
.allowSelfSignedCerts(true).build());
@@ -136,9 +136,9 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
.fetchTimeout(props.getInteger(
PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
- UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH))
+ PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
.fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
- UebTopicSource.DEFAULT_LIMIT_FETCH))
+ PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
.build());
newUebTopicSources.add(uebTopicSource);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactories.java
new file mode 100644
index 00000000..aa85e714
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactories.java
@@ -0,0 +1,43 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import lombok.Getter;
+
+public class NoopTopicFactories {
+
+ /**
+ * Factory for instantiation and management of sinks.
+ */
+ @Getter
+ private static final NoopTopicSinkFactory sinkFactory = new NoopTopicSinkFactory();
+
+ /**
+ * Factory for instantiation and management of sources.
+ */
+ @Getter
+ private static final NoopTopicSourceFactory sourceFactory = new NoopTopicSourceFactory();
+
+
+ private NoopTopicFactories() {
+ // do nothing
+ }
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java
index c52a30be..e7accad5 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java
@@ -30,11 +30,6 @@ import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
public class NoopTopicSink extends NoopTopicEndpoint implements TopicSink {
/**
- * Factory.
- */
- public static final NoopTopicSinkFactory factory = new NoopTopicSinkFactory();
-
- /**
* Constructs the object.
*/
public NoopTopicSink(List<String> servers, String topic) {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java
index a5b9349e..6f2c4a1e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java
@@ -30,11 +30,6 @@ import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
public class NoopTopicSource extends NoopTopicEndpoint implements TopicSource {
/**
- * Factory.
- */
- public static final NoopTopicSourceFactory factory = new NoopTopicSourceFactory();
-
- /**
* Constructs the object.
*/
public NoopTopicSource(List<String> servers, String topic) {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java
new file mode 100644
index 00000000..d02758be
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java
@@ -0,0 +1,43 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import lombok.Getter;
+
+public class UebTopicFactories {
+
+ /**
+ * Factory for instantiation and management of sinks.
+ */
+ @Getter
+ private static final UebTopicSinkFactory sinkFactory = new IndexedUebTopicSinkFactory();
+
+ /**
+ * Factory for instantiation and management of sources.
+ */
+ @Getter
+ private static final UebTopicSourceFactory sourceFactory = new IndexedUebTopicSourceFactory();
+
+
+ private UebTopicFactories() {
+ // do nothing
+ }
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java
index bc1251d5..acfef6da 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,8 +25,4 @@ package org.onap.policy.common.endpoints.event.comm.bus;
*/
public interface UebTopicSink extends BusTopicSink {
- /**
- * Factory of UEB Topic Sinks for instantiation and management purposes.
- */
- public static final UebTopicSinkFactory factory = new IndexedUebTopicSinkFactory();
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java
index ee4f013b..56534309 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,8 +26,4 @@ package org.onap.policy.common.endpoints.event.comm.bus;
*/
public interface UebTopicSource extends BusTopicSource {
- /**
- * factory for managing and tracking UEB readers.
- */
- public static UebTopicSourceFactory factory = new IndexedUebTopicSourceFactory();
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index abf793d6..b66b4ba5 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -24,7 +24,6 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal;
import com.att.nsa.cambria.client.CambriaClientBuilders;
import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
import com.att.nsa.cambria.client.CambriaConsumer;
-
import java.io.IOException;
import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
@@ -38,7 +37,6 @@ import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
import org.onap.dmaap.mr.client.response.MRConsumerResponse;
import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,9 +50,9 @@ public interface BusConsumer {
* fetch messages.
*
* @return list of messages
- * @throws Exception when error encountered by underlying libraries
+ * @throws IOException when error encountered by underlying libraries
*/
- public Iterable<String> fetch() throws InterruptedException, IOException;
+ public Iterable<String> fetch() throws IOException;
/**
* close underlying library consumer.
@@ -168,16 +166,24 @@ public interface BusConsumer {
}
@Override
- public Iterable<String> fetch() throws IOException, InterruptedException {
+ public Iterable<String> fetch() throws IOException {
try {
return getCurrentConsumer().fetch();
} catch (final IOException e) {
logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
- this.fetchTimeout, e);
+ this.fetchTimeout);
+ sleepAfterFetchFailure();
+ throw e;
+ }
+ }
+ private void sleepAfterFetchFailure() {
+ try {
this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS);
- throw e;
+ } catch (InterruptedException e) {
+ logger.warn("{}: interrupted while handling fetch error", this, e);
+ Thread.currentThread().interrupt();
}
}
@@ -306,12 +312,12 @@ public interface BusConsumer {
}
@Override
- public Iterable<String> fetch() throws InterruptedException, IOException {
+ public Iterable<String> fetch() throws IOException {
final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
if (response == null) {
logger.warn("{}: DMaaP NULL response received", this);
- closeCondition.await(fetchTimeout, TimeUnit.MILLISECONDS);
+ sleepAfterFetchFailure();
return new ArrayList<>();
} else {
logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
@@ -322,7 +328,7 @@ public interface BusConsumer {
logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
response.getResponseMessage());
- closeCondition.await(fetchTimeout, TimeUnit.MILLISECONDS);
+ sleepAfterFetchFailure();
/* fall through */
}
@@ -335,6 +341,16 @@ public interface BusConsumer {
}
}
+ private void sleepAfterFetchFailure() {
+ try {
+ this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS);
+
+ } catch (InterruptedException e) {
+ logger.warn("{}: interrupted while handling fetch error", this, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
@Override
public void close() {
this.closeCondition.countDown();
@@ -434,7 +450,8 @@ public interface BusConsumer {
final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
- ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)
+ ? busTopicParams.getAdditionalProps().get(
+ PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
: null);
if (busTopicParams.isEnvironmentInvalid()) {
@@ -474,7 +491,7 @@ public interface BusConsumer {
props = new Properties();
- props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
+ props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
props.setProperty("username", busTopicParams.getUserName());
props.setProperty("password", busTopicParams.getPassword());
@@ -489,7 +506,7 @@ public interface BusConsumer {
props.setProperty("Partner", busTopicParams.getPartner());
}
if (dme2RouteOffer != null) {
- props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+ props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
}
props.setProperty("Latitude", busTopicParams.getLatitude());
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index 67adf3b4..469794c7 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -26,7 +26,6 @@ import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import com.att.nsa.cambria.client.CambriaClientBuilders;
import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
import com.fasterxml.jackson.annotation.JsonIgnore;
-
import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
@@ -34,12 +33,10 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.lang3.StringUtils;
import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
import org.onap.dmaap.mr.client.response.MRPublisherResponse;
import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.slf4j.Logger;
@@ -299,7 +296,8 @@ public interface BusPublisher {
busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps());
String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
- ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)
+ ? busTopicParams.getAdditionalProps().get(
+ PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
: null;
validateParams(busTopicParams, dme2RouteOffer);
@@ -310,13 +308,13 @@ public interface BusPublisher {
props.setProperty("Environment", busTopicParams.getEnvironment());
props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
- props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
+ props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
if (busTopicParams.getPartner() != null) {
props.setProperty("Partner", busTopicParams.getPartner());
}
if (dme2RouteOffer != null) {
- props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+ props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
}
props.setProperty("Latitude", busTopicParams.getLatitude());
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index 0953465b..164f2b16 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -29,6 +29,7 @@ import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
import org.onap.policy.common.utils.network.NetworkUtil;
@@ -103,13 +104,13 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
}
if (busTopicParams.getFetchTimeout() <= 0) {
- this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
+ this.fetchTimeout = PolicyEndPointProperties.NO_TIMEOUT_MS_FETCH;
} else {
this.fetchTimeout = busTopicParams.getFetchTimeout();
}
if (busTopicParams.getFetchLimit() <= 0) {
- this.fetchLimit = NO_LIMIT_FETCH;
+ this.fetchLimit = PolicyEndPointProperties.NO_LIMIT_FETCH;
} else {
this.fetchLimit = busTopicParams.getFetchLimit();
}
@@ -225,7 +226,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
while (this.alive) {
try {
fetchAllMessages();
- } catch (Exception e) {
+ } catch (IOException | RuntimeException e) {
logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
}
}
@@ -233,7 +234,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
logger.info("{}: exiting thread", this);
}
- private void fetchAllMessages() throws InterruptedException, IOException {
+ private void fetchAllMessages() throws IOException {
for (String event : this.consumer.fetch()) {
synchronized (this) {
this.recentEvents.add(event);