diff options
Diffstat (limited to 'policy-endpoints/src/main')
6 files changed, 34 insertions, 44 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpoint.java index ef002f52..73c61651 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpoint.java @@ -39,7 +39,7 @@ public abstract class NoopTopicEndpoint extends TopicBase { private static Logger logger = LoggerFactory.getLogger(NoopTopicEndpoint.class); /** - * {@inheritDoc}. + * Constructs the object. */ public NoopTopicEndpoint(List<String> servers, String topic) { super(servers, topic); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java index d3745940..c52a30be 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java @@ -35,7 +35,7 @@ public class NoopTopicSink extends NoopTopicEndpoint implements TopicSink { public static final NoopTopicSinkFactory factory = new NoopTopicSinkFactory(); /** - * {@inheritDoc}. + * Constructs the object. */ public NoopTopicSink(List<String> servers, String topic) { super(servers, topic); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java index 95ed0fe6..a5b9349e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java @@ -35,7 +35,7 @@ public class NoopTopicSource extends NoopTopicEndpoint implements TopicSource { public static final NoopTopicSourceFactory factory = new NoopTopicSourceFactory(); /** - * {@inheritDoc}. + * Constructs the object. */ public NoopTopicSource(List<String> servers, String topic) { super(servers, topic); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index e20fb598..abf793d6 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -31,7 +31,8 @@ import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Map; import java.util.Properties; - +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; @@ -112,7 +113,7 @@ public interface BusConsumer { /** * close condition. */ - protected Object closeCondition = new Object(); + protected CountDownLatch closeCondition = new CountDownLatch(1); /** * Cambria Consumer Wrapper. @@ -172,10 +173,9 @@ public interface BusConsumer { return getCurrentConsumer().fetch(); } catch (final IOException e) { logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), - this.fetchTimeout); - synchronized (this.closeCondition) { - this.closeCondition.wait(this.fetchTimeout); - } + this.fetchTimeout, e); + + this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); throw e; } @@ -183,10 +183,7 @@ public interface BusConsumer { @Override public void close() { - synchronized (closeCondition) { - closeCondition.notifyAll(); - } - + this.closeCondition.countDown(); getCurrentConsumer().close(); } @@ -267,7 +264,7 @@ public interface BusConsumer { /** * close condition. */ - protected Object closeCondition = new Object(); + protected CountDownLatch closeCondition = new CountDownLatch(1); /** * MR Consumer. @@ -276,7 +273,7 @@ public interface BusConsumer { /** * MR Consumer Wrapper. - * + * * <p>servers messaging bus hosts * topic topic * apiKey API Key @@ -314,12 +311,10 @@ public interface BusConsumer { if (response == null) { logger.warn("{}: DMaaP NULL response received", this); - synchronized (closeCondition) { - closeCondition.wait(fetchTimeout); - } + closeCondition.await(fetchTimeout, TimeUnit.MILLISECONDS); return new ArrayList<>(); } else { - logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(), + logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage()); if (!"200".equals(response.getResponseCode())) { @@ -327,9 +322,7 @@ public interface BusConsumer { logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), response.getResponseMessage()); - synchronized (closeCondition) { - closeCondition.wait(fetchTimeout); - } + closeCondition.await(fetchTimeout, TimeUnit.MILLISECONDS); /* fall through */ } @@ -344,10 +337,7 @@ public interface BusConsumer { @Override public void close() { - synchronized (closeCondition) { - closeCondition.notifyAll(); - } - + this.closeCondition.countDown(); this.consumer.close(); } @@ -372,7 +362,7 @@ public interface BusConsumer { /** * BusTopicParams contain the following parameters. * MR Consumer Wrapper. - * + * * <p>servers messaging bus hosts * topic topic * apiKey API Key @@ -432,9 +422,9 @@ public interface BusConsumer { /** * Constructor. - * + * * @param busTopicParams topic paramters - * + * * @throws MalformedURLException must provide a valid URL */ public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 1c2d6eeb..1c85fa97 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -8,9 +8,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -49,7 +49,7 @@ public interface BusPublisher { /** * sends a message. - * + * * @param partitionId id * @param message the message * @return true if success, false otherwise @@ -74,11 +74,11 @@ public interface BusPublisher { */ @JsonIgnore @GsonJsonIgnore - protected volatile CambriaBatchingPublisher publisher; + protected CambriaBatchingPublisher publisher; /** * Constructor. - * + * * @param busTopicParams topic parameters */ public CambriaPublisherWrapper(BusTopicParams busTopicParams) { @@ -290,10 +290,10 @@ public interface BusPublisher { } public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { - + /** * Constructor. - * + * * @param busTopicParams topic parameters */ public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyJerseyServer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyJerseyServer.java index b7bafe0d..0aaf1ccb 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyJerseyServer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/internal/JettyJerseyServer.java @@ -145,14 +145,14 @@ public class JettyJerseyServer extends JettyServletServer { */ protected synchronized ServletHolder getServlet(String servletPath) { - ServletHolder jerseyServlet = servlets.get(servletPath); - if (jerseyServlet == null) { - jerseyServlet = context.addServlet(org.glassfish.jersey.servlet.ServletContainer.class, servletPath); + return servlets.computeIfAbsent(servletPath, key -> { + + ServletHolder jerseyServlet = + context.addServlet(org.glassfish.jersey.servlet.ServletContainer.class, servletPath); jerseyServlet.setInitOrder(0); - servlets.put(servletPath, jerseyServlet); - } - return jerseyServlet; + return jerseyServlet; + }); } @Override @@ -238,7 +238,7 @@ public class JettyJerseyServer extends JettyServletServer { } jerseyServlet.setInitParameter(ServerProperties.PROVIDER_CLASSNAMES, initClasses); - + jerseyServlet.setInitParameter(ServerProperties.METAINF_SERVICES_LOOKUP_DISABLE, "true"); } |