From 25423c50e504676f15c7a57c03aad40bfc35c7e6 Mon Sep 17 00:00:00 2001 From: Michael Dürre Date: Wed, 20 Jul 2022 09:32:50 +0200 Subject: migrate sdnr features to sulfur MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix sdnr code for sulfur Issue-ID: CCSDK-3692 Signed-off-by: Michael Dürre Change-Id: I0a62ade424bb978222e7ce6450215fb327f957b7 Signed-off-by: Michael Dürre --- .../sdnr/wt/common/database/SearchResult.java | 5 ++ .../sdnr/wt/common/database/data/IndicesEntry.java | 3 - .../common/threading/GenericRunnableFactory.java | 4 +- .../wt/common/threading/KeyBasedThreadpool.java | 65 ++++++++++++-- .../sdnr/wt/common/test/TestBaseHttpClient.java | 10 +-- .../wt/common/test/TestKeybasedThreadpool.java | 100 +++++++++++++++++++++ .../wt/common/test/helper/HelpServletBase.java | 1 - 7 files changed, 165 insertions(+), 23 deletions(-) create mode 100644 sdnr/wt/common/src/test/java/org/onap/ccsdk/features/sdnr/wt/common/test/TestKeybasedThreadpool.java (limited to 'sdnr/wt/common/src') diff --git a/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/database/SearchResult.java b/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/database/SearchResult.java index 529853e08..233f48fa3 100644 --- a/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/database/SearchResult.java +++ b/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/database/SearchResult.java @@ -23,7 +23,9 @@ package org.onap.ccsdk.features.sdnr.wt.common.database; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class SearchResult { @@ -53,6 +55,9 @@ public class SearchResult { public List getHits() { return this.hits; } + public Set getHitSets() { + return new HashSet<>(this.hits); + } public long getTotal() { return this.total; diff --git a/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/database/data/IndicesEntry.java b/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/database/data/IndicesEntry.java index b73d3eadc..914ee1cf1 100644 --- a/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/database/data/IndicesEntry.java +++ b/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/database/data/IndicesEntry.java @@ -22,11 +22,8 @@ package org.onap.ccsdk.features.sdnr.wt.common.database.data; import java.text.ParseException; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; /** diff --git a/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/threading/GenericRunnableFactory.java b/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/threading/GenericRunnableFactory.java index c163facbb..2f052d6b2 100644 --- a/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/threading/GenericRunnableFactory.java +++ b/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/threading/GenericRunnableFactory.java @@ -26,7 +26,5 @@ public abstract class GenericRunnableFactory { public GenericRunnableFactory() { } - public Runnable create(S arg, GenericRunnableFactoryCallback callback) { - return null; - } + public abstract Runnable create(final T key, final S arg); } diff --git a/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/threading/KeyBasedThreadpool.java b/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/threading/KeyBasedThreadpool.java index f41a9038f..f507eec13 100644 --- a/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/threading/KeyBasedThreadpool.java +++ b/sdnr/wt/common/src/main/java/org/onap/ccsdk/features/sdnr/wt/common/threading/KeyBasedThreadpool.java @@ -23,23 +23,27 @@ package org.onap.ccsdk.features.sdnr.wt.common.threading; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map.Entry; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Threadpool for running n instances per key T * * @author jack * - * @param - * @param + * @param type of key for the pools + * @param type of arg to create a runner */ public class KeyBasedThreadpool implements GenericRunnableFactoryCallback { + private static final Logger LOG = LoggerFactory.getLogger(KeyBasedThreadpool.class); private final Queue> queue; private final List runningKeys; private final int keyPoolSize; @@ -57,37 +61,80 @@ public class KeyBasedThreadpool implements GenericRunnableFactoryCallback< this.keyPoolSize = keyPoolSize; this.factory = factory; this.executor = Executors.newFixedThreadPool(poolSize); - this.runningKeys = new ArrayList<>(); + this.runningKeys = Collections.synchronizedList(new ArrayList()); + LOG.info("starting key-based threadpool with keysize={} and size={}", keyPoolSize, poolSize); } public void execute(T key, S arg) { if (this.isKeyPoolSizeReached(key)) { + LOG.debug("pool size for key {} reached. add to queue", key); queue.add(new SimpleEntry<>(key, arg)); + } else { + LOG.debug("starting executor for key {}.", key); this.runningKeys.add(key); - this.executor.execute(this.factory.create(arg, this)); + this.executor.execute(new RunnableWrapper(this.factory.create(key, arg), key, this)); } } private void executeNext() { Entry entry = this.queue.peek(); - if (!this.isKeyPoolSizeReached(entry.getKey())) { - this.queue.poll(); - this.runningKeys.add(entry.getKey()); - this.executor.execute(this.factory.create(entry.getValue(), this)); + if (entry != null) { + LOG.debug("executing next for key {} with arg {}", entry.getKey(), entry.getValue()); + if (!this.isKeyPoolSizeReached(entry.getKey())) { + this.queue.poll(); + this.runningKeys.add(entry.getKey()); + this.executor.execute(new RunnableWrapper(this.factory.create(entry.getKey(), entry.getValue()), + entry.getKey(), this)); + } else { + LOG.debug("key pool size reached. waiting for someone else to stop"); + } + } else { + LOG.info("nothing to execute. queue is empty."); } } private boolean isKeyPoolSizeReached(T key) { + LOG.trace("running keys size={}", this.runningKeys.size()); return this.runningKeys.stream().filter(e -> e == key).count() >= this.keyPoolSize; } @Override - public void onFinish(T key) { + public synchronized void onFinish(T key) { + LOG.debug("executor finished for key {}.", key); this.runningKeys.remove(key); this.executeNext(); } + public void join() { + LOG.debug("wait for all executors to finish"); + while (this.runningKeys.size() > 0 && this.queue.size() > 0) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private static class RunnableWrapper implements Runnable { + + private final Runnable inner; + private final GenericRunnableFactoryCallback callback; + private final T key; + public RunnableWrapper(Runnable inner, T key, GenericRunnableFactoryCallback cb) { + this.inner = inner; + this.callback = cb; + this.key = key; + } + + @Override + public void run() { + this.inner.run(); + this.callback.onFinish(this.key); + } + + } } diff --git a/sdnr/wt/common/src/test/java/org/onap/ccsdk/features/sdnr/wt/common/test/TestBaseHttpClient.java b/sdnr/wt/common/src/test/java/org/onap/ccsdk/features/sdnr/wt/common/test/TestBaseHttpClient.java index 3584d7f28..253b790eb 100644 --- a/sdnr/wt/common/src/test/java/org/onap/ccsdk/features/sdnr/wt/common/test/TestBaseHttpClient.java +++ b/sdnr/wt/common/src/test/java/org/onap/ccsdk/features/sdnr/wt/common/test/TestBaseHttpClient.java @@ -25,7 +25,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; @@ -33,18 +35,12 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; - import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.onap.ccsdk.features.sdnr.wt.common.http.BaseHTTPClient; import org.onap.ccsdk.features.sdnr.wt.common.http.BaseHTTPResponse; -import com.sun.net.httpserver.HttpExchange; -import com.sun.net.httpserver.HttpHandler; -import com.sun.net.httpserver.HttpServer; - -@SuppressWarnings("restriction") public class TestBaseHttpClient { public static final String HTTPMETHOD_GET = "GET"; diff --git a/sdnr/wt/common/src/test/java/org/onap/ccsdk/features/sdnr/wt/common/test/TestKeybasedThreadpool.java b/sdnr/wt/common/src/test/java/org/onap/ccsdk/features/sdnr/wt/common/test/TestKeybasedThreadpool.java new file mode 100644 index 000000000..868275690 --- /dev/null +++ b/sdnr/wt/common/src/test/java/org/onap/ccsdk/features/sdnr/wt/common/test/TestKeybasedThreadpool.java @@ -0,0 +1,100 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + */ +package org.onap.ccsdk.features.sdnr.wt.common.test; + +import java.util.Random; + +import org.junit.Ignore; +import org.junit.Test; +import org.onap.ccsdk.features.sdnr.wt.common.threading.GenericRunnableFactory; +import org.onap.ccsdk.features.sdnr.wt.common.threading.KeyBasedThreadpool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestKeybasedThreadpool { + + + private static final Logger LOG = LoggerFactory.getLogger(TestKeybasedThreadpool.class); + private static final String KEY_A = "a"; + private static final String KEY_B = "b"; + private static final String KEY_C = "c"; + private static final String KEY_D = "d"; + + @Ignore + @Test + public void test1() { + GenericRunnableFactory factory1 = + new GenericRunnableFactory() { + @Override + public Runnable create(final String key, final TestClass arg) { + return new Runnable() { + + @Override + public void run() { + final String key2 = arg.value; + final long sleep = arg.sleep; + LOG.info("{}: sleeping now for {} seconds",key2, sleep); + try { + Thread.sleep(sleep*1000); + } catch (InterruptedException e) { + LOG.error("InterruptedException",e); + Thread.currentThread().interrupt(); + } + LOG.info("{}: finished",key2); + } + }; + } + }; + LOG.info("starting"); + KeyBasedThreadpool threadpool = new KeyBasedThreadpool(10, 1, factory1); + threadpool.execute(KEY_A, new TestClass(KEY_A)); + threadpool.execute(KEY_A, new TestClass(KEY_A)); + threadpool.execute(KEY_A, new TestClass(KEY_A)); + threadpool.execute(KEY_B, new TestClass(KEY_B)); + threadpool.execute(KEY_C, new TestClass(KEY_C)); + threadpool.execute(KEY_D, new TestClass(KEY_D)); + threadpool.execute(KEY_D, new TestClass(KEY_D)); + threadpool.join(); + LOG.info("done"); + } + + private static int counter=0; + + + public class TestClass { + protected final long sleep; + private final String value; + + public TestClass(String value) { + + this.value = value+ String.valueOf(counter++); + Random rnd = new Random(); + this.sleep = rnd.nextInt(20); + LOG.info("instatiate {}",this); + } + + @Override + public String toString() { + return "TestClass [sleep=" + sleep + ", value=" + value + "]"; + } + } +} diff --git a/sdnr/wt/common/src/test/java/org/onap/ccsdk/features/sdnr/wt/common/test/helper/HelpServletBase.java b/sdnr/wt/common/src/test/java/org/onap/ccsdk/features/sdnr/wt/common/test/helper/HelpServletBase.java index a3d2c9be9..6913ec21e 100644 --- a/sdnr/wt/common/src/test/java/org/onap/ccsdk/features/sdnr/wt/common/test/helper/HelpServletBase.java +++ b/sdnr/wt/common/src/test/java/org/onap/ccsdk/features/sdnr/wt/common/test/helper/HelpServletBase.java @@ -45,7 +45,6 @@ import org.junit.Before; import org.onap.ccsdk.features.sdnr.wt.common.test.ServletInputStreamFromByteArrayInputStream; import org.onap.ccsdk.features.sdnr.wt.common.test.ServletOutputStreamToStringWriter; -@SuppressWarnings("restriction") public class HelpServletBase { public static final String RESPONSE_GET = "This is the response get"; -- cgit 1.2.3-korg