aboutsummaryrefslogtreecommitdiffstats
path: root/sdnr/wt/common
diff options
context:
space:
mode:
Diffstat (limited to 'sdnr/wt/common')
-rw-r--r--sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/threading/KeyBasedThreadpool.java9
1 files changed, 6 insertions, 3 deletions
diff --git a/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/threading/KeyBasedThreadpool.java b/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/threading/KeyBasedThreadpool.java
index f507eec13..c6bd16484 100644
--- a/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/threading/KeyBasedThreadpool.java
+++ b/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/threading/KeyBasedThreadpool.java
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
public class KeyBasedThreadpool<T, S> implements GenericRunnableFactoryCallback<T> {
private static final Logger LOG = LoggerFactory.getLogger(KeyBasedThreadpool.class);
- private final Queue<Entry<T, S>> queue;
+ private final ConcurrentLinkedQueue<Entry<T, S>> queue;
private final List<T> runningKeys;
private final int keyPoolSize;
private final GenericRunnableFactory<T, S> factory;
@@ -65,7 +65,7 @@ public class KeyBasedThreadpool<T, S> implements GenericRunnableFactoryCallback<
LOG.info("starting key-based threadpool with keysize={} and size={}", keyPoolSize, poolSize);
}
- public void execute(T key, S arg) {
+ public synchronized void execute(T key, S arg) {
if (this.isKeyPoolSizeReached(key)) {
LOG.debug("pool size for key {} reached. add to queue", key);
queue.add(new SimpleEntry<>(key, arg));
@@ -97,6 +97,9 @@ public class KeyBasedThreadpool<T, S> implements GenericRunnableFactoryCallback<
private boolean isKeyPoolSizeReached(T key) {
LOG.trace("running keys size={}", this.runningKeys.size());
+ if (this.keyPoolSize == 1) {
+ return this.runningKeys.contains(key);
+ }
return this.runningKeys.stream().filter(e -> e == key).count() >= this.keyPoolSize;
}
@@ -107,7 +110,7 @@ public class KeyBasedThreadpool<T, S> implements GenericRunnableFactoryCallback<
this.executeNext();
}
- public void join() {
+ public synchronized void join() {
LOG.debug("wait for all executors to finish");
while (this.runningKeys.size() > 0 && this.queue.size() > 0) {
try {