summaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/drools/event/comm')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java28
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java27
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java38
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java20
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java7
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java50
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java15
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java7
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java9
9 files changed, 92 insertions, 109 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java
index 8e18bba8..1efaa063 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java
@@ -147,9 +147,7 @@ public interface BusPublisher {
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("CambriaPublisherWrapper []");
- return builder.toString();
+ return "CambriaPublisherWrapper []";
}
}
@@ -287,15 +285,10 @@ public interface BusPublisher {
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("DmaapPublisherWrapper [").
- append("publisher.getAuthDate()=").append(publisher.getAuthDate()).
- append(", publisher.getAuthKey()=").append(publisher.getAuthKey()).
- append(", publisher.getHost()=").append(publisher.getHost()).
- append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()).
- append(", publisher.getUsername()=").append(publisher.getUsername()).
- append("]");
- return builder.toString();
+ return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
+ + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
+ + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
+ + ", publisher.getUsername()=" + publisher.getUsername() + "]";
}
}
@@ -329,13 +322,16 @@ public interface BusPublisher {
String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
- if (environment == null || environment.isEmpty()) {
+ if (environment == null || environment.isEmpty()) {
throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
+ }
+ if (aftEnvironment == null || aftEnvironment.isEmpty()) {
throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- } if (latitude == null || latitude.isEmpty()) {
+ }
+ if (latitude == null || latitude.isEmpty()) {
throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- } if (longitude == null || longitude.isEmpty()) {
+ }
+ if (longitude == null || longitude.isEmpty()) {
throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java
index f145f3b8..0bf3d445 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java
@@ -1,8 +1,8 @@
-/*-
+/*
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -67,8 +67,7 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled {
String apiKey,
String apiSecret,
boolean useHttps,
- boolean allowSelfSignedCerts)
- throws IllegalArgumentException {
+ boolean allowSelfSignedCerts) {
super(servers, topic);
@@ -102,6 +101,26 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled {
return allowSelfSignedCerts;
}
+ protected boolean anyNullOrEmpty(String... args) {
+ for (String arg : args) {
+ if (arg == null || arg.isEmpty()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ protected boolean allNullOrEmpty(String... args) {
+ for (String arg : args) {
+ if (!(arg == null || arg.isEmpty())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
@Override
public String toString() {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
index f86c27c7..a50d7b10 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
@@ -1,8 +1,8 @@
-/*-
+/*
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -63,8 +63,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
* @throws IllegalArgumentException in invalid parameters are passed in
*/
public InlineBusTopicSink(List<String> servers, String topic,
- String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts)
- throws IllegalArgumentException {
+ String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts) {
super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
@@ -82,7 +81,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
* {@inheritDoc}
*/
@Override
- public boolean start() throws IllegalStateException {
+ public boolean start() {
logger.info("{}: starting", this);
synchronized(this) {
@@ -132,7 +131,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
* {@inheritDoc}
*/
@Override
- public boolean send(String message) throws IllegalArgumentException, IllegalStateException {
+ public boolean send(String message) {
if (message == null || message.isEmpty()) {
throw new IllegalArgumentException("Message to send is empty");
@@ -181,16 +180,33 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
* {@inheritDoc}
*/
@Override
- public void shutdown() throws IllegalStateException {
+ public void shutdown() {
this.stop();
}
+
+ protected boolean anyNullOrEmpty(String... args) {
+ for (String arg : args) {
+ if (arg == null || arg.isEmpty()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ protected boolean allNullOrEmpty(String... args) {
+ for (String arg : args) {
+ if (!(arg == null || arg.isEmpty())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("InlineBusTopicSink [partitionId=").append(partitionId).append(", alive=").append(alive)
- .append(", publisher=").append(publisher).append("]");
- return builder.toString();
+ return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]";
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
index 718bb21d..48116e34 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
@@ -75,8 +75,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
String partitionKey,
String environment, String aftEnvironment, String partner,
String latitude, String longitude, Map<String,String> additionalProps,
- boolean useHttps, boolean allowSelfSignedCerts)
- throws IllegalArgumentException {
+ boolean useHttps, boolean allowSelfSignedCerts) {
super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
@@ -96,8 +95,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
public InlineDmaapTopicSink(List<String> servers, String topic,
String apiKey, String apiSecret,
String userName, String password,
- String partitionKey, boolean useHttps, boolean allowSelfSignedCerts)
- throws IllegalArgumentException {
+ String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) {
super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
@@ -108,11 +106,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
@Override
public void init() {
- if ((this.environment == null || this.environment.isEmpty()) &&
- (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
- (this.latitude == null || this.latitude.isEmpty()) &&
- (this.longitude == null || this.longitude.isEmpty()) &&
- (this.partner == null || this.partner.isEmpty())) {
+ if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
this.publisher =
new BusPublisher.CambriaPublisherWrapper(this.servers,
this.topic,
@@ -142,11 +136,9 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("InlineDmaapTopicSink [userName=").append(userName).append(", password=").append(password)
- .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
- .append(super.toString()).append("]");
- return builder.toString();
+ return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password
+ + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()="
+ + super.toString() + "]";
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
index 0c01c8b5..d1218f3f 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
@@ -1,8 +1,8 @@
-/*-
+/*
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -57,8 +57,7 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
String apiSecret,
String partitionId,
boolean useHttps,
- boolean allowSelfSignedCerts)
- throws IllegalArgumentException {
+ boolean allowSelfSignedCerts) {
super(servers, topic, apiKey, apiSecret, partitionId, useHttps, allowSelfSignedCerts);
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index 5e8cf489..768046d0 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -21,17 +21,15 @@
package org.onap.policy.drools.event.comm.bus.internal;
import java.net.MalformedURLException;
-import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.onap.policy.drools.event.comm.FilterableTopicSource;
import org.onap.policy.drools.event.comm.TopicListener;
import org.onap.policy.drools.event.comm.bus.BusTopicSource;
import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
import org.onap.policy.drools.utils.NetworkUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This topic source implementation specializes in reading messages
@@ -74,23 +72,10 @@ public abstract class SingleThreadedBusTopicSource
protected BusConsumer consumer;
/**
- * Am I running?
- * reflects invocation of start()/stop()
- * !locked & start() => alive
- * stop() => !alive
- */
- protected volatile boolean alive = false;
-
- /**
* Independent thread reading message over my topic
*/
protected Thread busPollerThread;
- /**
- * All my subscribers for new message notifications
- */
- protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
-
/**
*
@@ -115,8 +100,7 @@ public abstract class SingleThreadedBusTopicSource
int fetchTimeout,
int fetchLimit,
boolean useHttps,
- boolean allowSelfSignedCerts)
- throws IllegalArgumentException {
+ boolean allowSelfSignedCerts) {
super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
@@ -152,8 +136,7 @@ public abstract class SingleThreadedBusTopicSource
public abstract void init() throws MalformedURLException;
@Override
- public void register(TopicListener topicListener)
- throws IllegalArgumentException {
+ public void register(TopicListener topicListener) {
super.register(topicListener);
@@ -182,7 +165,7 @@ public abstract class SingleThreadedBusTopicSource
}
@Override
- public boolean start() throws IllegalStateException {
+ public boolean start() {
logger.info("{}: starting", this);
synchronized(this) {
@@ -299,23 +282,10 @@ public abstract class SingleThreadedBusTopicSource
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)
- .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout)
- .append(", fetchLimit=").append(fetchLimit)
- .append(", consumer=").append(this.consumer).append(", alive=")
- .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread)
- .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString())
- .append("]");
- return builder.toString();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isAlive() {
- return alive;
+ return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
+ + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer="
+ + this.consumer + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread
+ + ", topicListeners=" + topicListeners.size() + ", toString()=" + super.toString() + "]";
}
/**
@@ -338,7 +308,7 @@ public abstract class SingleThreadedBusTopicSource
* {@inheritDoc}
*/
@Override
- public void shutdown() throws IllegalStateException {
+ public void shutdown() {
this.stop();
this.topicListeners.clear();
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
index b0c456da..6a9a2d6d 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
@@ -79,8 +79,7 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
int fetchTimeout, int fetchLimit,
String environment, String aftEnvironment, String partner,
String latitude, String longitude, Map<String,String> additionalProps,
- boolean useHttps, boolean allowSelfSignedCerts)
- throws IllegalArgumentException {
+ boolean useHttps, boolean allowSelfSignedCerts) {
super(servers, topic, apiKey, apiSecret,
consumerGroup, consumerInstance,
@@ -123,8 +122,7 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
String apiKey, String apiSecret,
String userName, String password,
String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts)
- throws IllegalArgumentException {
+ int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) {
super(servers, topic, apiKey, apiSecret,
@@ -148,19 +146,14 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
*/
@Override
public void init() throws MalformedURLException {
- if (this.userName == null || this.userName.isEmpty() ||
- this.password == null || this.password.isEmpty()) {
+ if (anyNullOrEmpty(this.userName, this.password)) {
this.consumer =
new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
this.apiKey, this.apiSecret,
this.consumerGroup, this.consumerInstance,
this.fetchTimeout, this.fetchLimit,
this.useHttps, this.allowSelfSignedCerts);
- } else if ((this.environment == null || this.environment.isEmpty()) &&
- (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
- (this.latitude == null || this.latitude.isEmpty()) &&
- (this.longitude == null || this.longitude.isEmpty()) &&
- (this.partner == null || this.partner.isEmpty())) {
+ } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
this.consumer =
new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
this.apiKey, this.apiSecret,
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java
index e394e3df..fcbee631 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java
@@ -1,8 +1,8 @@
-/*-
+/*
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -52,8 +52,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource
public SingleThreadedUebTopicSource(List<String> servers, String topic,
String apiKey, String apiSecret,
String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts)
- throws IllegalArgumentException {
+ int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) {
super(servers, topic, apiKey, apiSecret,
consumerGroup, consumerInstance,
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java
index b1b29808..22c6b1d5 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java
@@ -1,8 +1,8 @@
-/*-
+/*
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -82,7 +82,7 @@ public abstract class TopicBase implements Topic {
* @return a Topic Base
* @throws IllegalArgumentException if invalid parameters are present
*/
- public TopicBase(List<String> servers, String topic) throws IllegalArgumentException {
+ public TopicBase(List<String> servers, String topic) {
if (servers == null || servers.isEmpty()) {
throw new IllegalArgumentException("Server(s) must be provided");
@@ -97,8 +97,7 @@ public abstract class TopicBase implements Topic {
}
@Override
- public void register(TopicListener topicListener)
- throws IllegalArgumentException {
+ public void register(TopicListener topicListener) {
logger.info("{}: registering {}", this, topicListener);