diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus')
9 files changed, 92 insertions, 109 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java index 8e18bba8..1efaa063 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java @@ -147,9 +147,7 @@ public interface BusPublisher { @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("CambriaPublisherWrapper []"); - return builder.toString(); + return "CambriaPublisherWrapper []"; } } @@ -287,15 +285,10 @@ public interface BusPublisher { @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("DmaapPublisherWrapper ["). - append("publisher.getAuthDate()=").append(publisher.getAuthDate()). - append(", publisher.getAuthKey()=").append(publisher.getAuthKey()). - append(", publisher.getHost()=").append(publisher.getHost()). - append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()). - append(", publisher.getUsername()=").append(publisher.getUsername()). - append("]"); - return builder.toString(); + return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate() + + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()=" + + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag() + + ", publisher.getUsername()=" + publisher.getUsername() + "]"; } } @@ -329,13 +322,16 @@ public interface BusPublisher { String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); - if (environment == null || environment.isEmpty()) { + if (environment == null || environment.isEmpty()) { throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } if (aftEnvironment == null || aftEnvironment.isEmpty()) { + } + if (aftEnvironment == null || aftEnvironment.isEmpty()) { throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } if (latitude == null || latitude.isEmpty()) { + } + if (latitude == null || latitude.isEmpty()) { throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } if (longitude == null || longitude.isEmpty()) { + } + if (longitude == null || longitude.isEmpty()) { throw parmException(topic, PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java index f145f3b8..0bf3d445 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusTopicBase.java @@ -1,8 +1,8 @@ -/*- +/* * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,8 +67,7 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { String apiKey, String apiSecret, boolean useHttps, - boolean allowSelfSignedCerts) - throws IllegalArgumentException { + boolean allowSelfSignedCerts) { super(servers, topic); @@ -102,6 +101,26 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { return allowSelfSignedCerts; } + protected boolean anyNullOrEmpty(String... args) { + for (String arg : args) { + if (arg == null || arg.isEmpty()) { + return true; + } + } + + return false; + } + + protected boolean allNullOrEmpty(String... args) { + for (String arg : args) { + if (!(arg == null || arg.isEmpty())) { + return false; + } + } + + return true; + } + @Override public String toString() { diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java index f86c27c7..a50d7b10 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java @@ -1,8 +1,8 @@ -/*- +/* * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,8 +63,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi * @throws IllegalArgumentException in invalid parameters are passed in */ public InlineBusTopicSink(List<String> servers, String topic, - String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts) - throws IllegalArgumentException { + String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts) { super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts); @@ -82,7 +81,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi * {@inheritDoc} */ @Override - public boolean start() throws IllegalStateException { + public boolean start() { logger.info("{}: starting", this); synchronized(this) { @@ -132,7 +131,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi * {@inheritDoc} */ @Override - public boolean send(String message) throws IllegalArgumentException, IllegalStateException { + public boolean send(String message) { if (message == null || message.isEmpty()) { throw new IllegalArgumentException("Message to send is empty"); @@ -181,16 +180,33 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi * {@inheritDoc} */ @Override - public void shutdown() throws IllegalStateException { + public void shutdown() { this.stop(); } + + protected boolean anyNullOrEmpty(String... args) { + for (String arg : args) { + if (arg == null || arg.isEmpty()) { + return true; + } + } + + return false; + } + + protected boolean allNullOrEmpty(String... args) { + for (String arg : args) { + if (!(arg == null || arg.isEmpty())) { + return false; + } + } + + return true; + } @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("InlineBusTopicSink [partitionId=").append(partitionId).append(", alive=").append(alive) - .append(", publisher=").append(publisher).append("]"); - return builder.toString(); + return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]"; } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java index 718bb21d..48116e34 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -75,8 +75,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop String partitionKey, String environment, String aftEnvironment, String partner, String latitude, String longitude, Map<String,String> additionalProps, - boolean useHttps, boolean allowSelfSignedCerts) - throws IllegalArgumentException { + boolean useHttps, boolean allowSelfSignedCerts) { super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts); @@ -96,8 +95,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName, String password, - String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) - throws IllegalArgumentException { + String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) { super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts); @@ -108,11 +106,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop @Override public void init() { - if ((this.environment == null || this.environment.isEmpty()) && - (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) && - (this.latitude == null || this.latitude.isEmpty()) && - (this.longitude == null || this.longitude.isEmpty()) && - (this.partner == null || this.partner.isEmpty())) { + if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { this.publisher = new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, @@ -142,11 +136,9 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("InlineDmaapTopicSink [userName=").append(userName).append(", password=").append(password) - .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=") - .append(super.toString()).append("]"); - return builder.toString(); + return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password + + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + + super.toString() + "]"; } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java index 0c01c8b5..d1218f3f 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java @@ -1,8 +1,8 @@ -/*- +/* * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,8 +57,7 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi String apiSecret, String partitionId, boolean useHttps, - boolean allowSelfSignedCerts) - throws IllegalArgumentException { + boolean allowSelfSignedCerts) { super(servers, topic, apiKey, apiSecret, partitionId, useHttps, allowSelfSignedCerts); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java index 5e8cf489..768046d0 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -21,17 +21,15 @@ package org.onap.policy.drools.event.comm.bus.internal; import java.net.MalformedURLException; -import java.util.ArrayList; import java.util.List; import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.onap.policy.drools.event.comm.FilterableTopicSource; import org.onap.policy.drools.event.comm.TopicListener; import org.onap.policy.drools.event.comm.bus.BusTopicSource; import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer; import org.onap.policy.drools.utils.NetworkUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This topic source implementation specializes in reading messages @@ -74,23 +72,10 @@ public abstract class SingleThreadedBusTopicSource protected BusConsumer consumer; /** - * Am I running? - * reflects invocation of start()/stop() - * !locked & start() => alive - * stop() => !alive - */ - protected volatile boolean alive = false; - - /** * Independent thread reading message over my topic */ protected Thread busPollerThread; - /** - * All my subscribers for new message notifications - */ - protected final ArrayList<TopicListener> topicListeners = new ArrayList<>(); - /** * @@ -115,8 +100,7 @@ public abstract class SingleThreadedBusTopicSource int fetchTimeout, int fetchLimit, boolean useHttps, - boolean allowSelfSignedCerts) - throws IllegalArgumentException { + boolean allowSelfSignedCerts) { super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts); @@ -152,8 +136,7 @@ public abstract class SingleThreadedBusTopicSource public abstract void init() throws MalformedURLException; @Override - public void register(TopicListener topicListener) - throws IllegalArgumentException { + public void register(TopicListener topicListener) { super.register(topicListener); @@ -182,7 +165,7 @@ public abstract class SingleThreadedBusTopicSource } @Override - public boolean start() throws IllegalStateException { + public boolean start() { logger.info("{}: starting", this); synchronized(this) { @@ -299,23 +282,10 @@ public abstract class SingleThreadedBusTopicSource @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup) - .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout) - .append(", fetchLimit=").append(fetchLimit) - .append(", consumer=").append(this.consumer).append(", alive=") - .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread) - .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString()) - .append("]"); - return builder.toString(); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isAlive() { - return alive; + return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance + + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + + this.consumer + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + + ", topicListeners=" + topicListeners.size() + ", toString()=" + super.toString() + "]"; } /** @@ -338,7 +308,7 @@ public abstract class SingleThreadedBusTopicSource * {@inheritDoc} */ @Override - public void shutdown() throws IllegalStateException { + public void shutdown() { this.stop(); this.topicListeners.clear(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java index b0c456da..6a9a2d6d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -79,8 +79,7 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource int fetchTimeout, int fetchLimit, String environment, String aftEnvironment, String partner, String latitude, String longitude, Map<String,String> additionalProps, - boolean useHttps, boolean allowSelfSignedCerts) - throws IllegalArgumentException { + boolean useHttps, boolean allowSelfSignedCerts) { super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, @@ -123,8 +122,7 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource String apiKey, String apiSecret, String userName, String password, String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) - throws IllegalArgumentException { + int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) { super(servers, topic, apiKey, apiSecret, @@ -148,19 +146,14 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource */ @Override public void init() throws MalformedURLException { - if (this.userName == null || this.userName.isEmpty() || - this.password == null || this.password.isEmpty()) { + if (anyNullOrEmpty(this.userName, this.password)) { this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts); - } else if ((this.environment == null || this.environment.isEmpty()) && - (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) && - (this.latitude == null || this.latitude.isEmpty()) && - (this.longitude == null || this.longitude.isEmpty()) && - (this.partner == null || this.partner.isEmpty())) { + } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java index e394e3df..fcbee631 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java @@ -1,8 +1,8 @@ -/*- +/* * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,8 +52,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource public SingleThreadedUebTopicSource(List<String> servers, String topic, String apiKey, String apiSecret, String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) - throws IllegalArgumentException { + int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) { super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java index b1b29808..22c6b1d5 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/TopicBase.java @@ -1,8 +1,8 @@ -/*- +/* * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -82,7 +82,7 @@ public abstract class TopicBase implements Topic { * @return a Topic Base * @throws IllegalArgumentException if invalid parameters are present */ - public TopicBase(List<String> servers, String topic) throws IllegalArgumentException { + public TopicBase(List<String> servers, String topic) { if (servers == null || servers.isEmpty()) { throw new IllegalArgumentException("Server(s) must be provided"); @@ -97,8 +97,7 @@ public abstract class TopicBase implements Topic { } @Override - public void register(TopicListener topicListener) - throws IllegalArgumentException { + public void register(TopicListener topicListener) { logger.info("{}: registering {}", this, topicListener); |