diff options
author | Dan Timoney <dtimoney@att.com> | 2017-07-27 16:32:20 -0400 |
---|---|---|
committer | Dan Timoney <dtimoney@att.com> | 2017-08-01 10:42:39 -0400 |
commit | 32f16144e17d2df0831f14d9e65a83756a6ef844 (patch) | |
tree | 4f7c5f04c3e4216b1821648cc67b17720b34a8a8 /dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm | |
parent | c95543bf425024f5bd4a127422251b72e408ce2b (diff) |
Refactor dblib
Changed openecomp references in dblib, filters and sli to onap. Note: these
must be committed together to get a clean compile.
Issue: CCSDK-11
Change-Id: Ibe0f64fb20f3ae9cdda2f7ea969ca722bbde0d15
Signed-off-by: Dan Timoney <dtimoney@att.com>
Diffstat (limited to 'dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm')
3 files changed, 491 insertions, 0 deletions
diff --git a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/PollingWorker.java b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/PollingWorker.java new file mode 100644 index 00000000..b670a832 --- /dev/null +++ b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/PollingWorker.java @@ -0,0 +1,217 @@ +/*- + * ============LICENSE_START======================================================= + * onap + * ================================================================================ + * Copyright (C) 2016 - 2017 ONAP + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.core.dblib.pm; + +import java.util.Iterator; +import java.util.Properties; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.TreeSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +public class PollingWorker implements Runnable { + + private Logger LOGGER = LoggerFactory.getLogger(PollingWorker.class); + + private static PollingWorker self = null; + + private LinkedBlockingQueue tasks = new LinkedBlockingQueue(100); + private long interval = 1000L; + private Thread worker = null; + private AtomicLong[] counters = null; + private int[] bucketUnit = null; + private static boolean enabled = false; + private Timer timer = null; + + public static void post(long starttime){ + PollingWorker temp = self; + if(temp != null && enabled) { + temp.register(new TestSample(starttime)); + } + } + + public static void createInistance(Properties props){ + self = new PollingWorker(props); + } + + private PollingWorker(Properties ctxprops){ + if(ctxprops==null || ctxprops.getProperty("org.onap.ccsdk.dblib.pm") == null){ + enabled = false; + } else { + if("true".equalsIgnoreCase((String)ctxprops.getProperty("org.onap.ccsdk.dblib.pm"))){ + enabled = true; + } else { + enabled = false; + } + } + + interval = Long.parseLong(( ctxprops == null || ctxprops.getProperty("org.onap.ccsdk.dblib.pm.interval") == null) ? "60" : (String)ctxprops.getProperty("org.onap.ccsdk.dblib.pm.interval")); + // '0' bucket is to count exceptions + String sampling[] = ((ctxprops == null || ctxprops.getProperty("org.onap.ccsdk.dblib.pm.sampling")==null) ? "0,2,5,10,20,50,100" : (String)ctxprops.getProperty("org.onap.ccsdk.dblib.pm.sampling")).split(","); + + if(enabled){ + bucketUnit = new int[sampling.length]; + for(int i=0, max = bucketUnit.length; i<max; i++){ + bucketUnit[i] = Integer.parseInt(sampling[i].trim()); + } + counters = new AtomicLong[bucketUnit.length+1]; + for(int i=0, max = counters.length; i<max; i++){ + counters[i] = new AtomicLong(); + } + worker = new Thread(this); + worker.setDaemon(true); + worker.start(); + timer = new Timer(true); + timer.schedule(new MyTimerTask(), interval*1000L, interval*1000L); + } + } + + private void register(TestSample object){ + try { + tasks.add(object); + } catch(Throwable exc) { + // if cannot add an object to the queue, do nothing + } + } + + private void deRegister(TestSample object){ + tasks.remove(object); + } + + public void run() { + for(;;){ + Set data = new TreeSet(); + tasks.drainTo(data); + for(Iterator it = data.iterator(); it.hasNext(); ){ + Object next = it.next(); + if(next instanceof TestSample){ + consume((TestSample)next); + } else { + System.out.println(next.getClass().getName()); + } + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } + public void clearReqister(){ + AtomicLong[] tmp = new AtomicLong[counters.length]; + for(int i=0, max = tmp.length; i<max; i++){ + tmp[i] = new AtomicLong(); + } + AtomicLong[] tmp2 = counters; + synchronized(tmp2){ + counters = tmp; + } + StringBuffer sb = new StringBuffer("CPM: "); + for(int i=0, max = tmp2.length; i < max; i++){ + if(i==0 && bucketUnit[0]==0){ + sb.append("[Exc]="); + } else { + sb.append("["); + if(i==bucketUnit.length){ + sb.append("Other]="); + } else { + sb.append(bucketUnit[i]).append(" ms]="); + } + } + sb.append(tmp2[i].get()).append("\t"); + } + LOGGER.info(sb.toString()); + } + + class MyTimerTask extends TimerTask{ + + public void run() { + + clearReqister(); + } + + } + + private void consume(TestSample probe) { + AtomicLong[] tmp = counters; + synchronized(tmp){ + counters[getBucket(probe.getDuration())].incrementAndGet(); + } + } + + /* + * This method is used to find the offset of the bucket in + * counters. 'counters' array is 1 size longer than bucketUnit, + * hence by default it returns 'bucketUnit.length' + */ + private int getBucket(long difftime){ + for(int i=0; i<bucketUnit.length; i++){ + if(difftime < bucketUnit[i]){ + return i; + } + } + return bucketUnit.length; + } + + private static boolean isEnabled() { + return enabled; + } + /** + * @author Rich Tabedzki + * A helper class to pass measured parameter to the counter. + */ + static class TestSample implements Comparable{ + private long starttime; + private long endtime; + + public TestSample(long starttime) { + this.endtime = System.currentTimeMillis(); + this.starttime = starttime; + } + + public long getDuration(){ + return endtime - starttime; + } + + public int compareTo(Object o) { + if(o instanceof TestSample){ + TestSample x = (TestSample)o; + if(starttime < x.starttime) + return 1; + if(endtime < x.endtime) + return 1; + if(starttime > x.starttime) + return -1; + if(endtime > x.endtime) + return -1; + return 0; + } + return 1; + } + } +} diff --git a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/SQLExecutionMonitor.java b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/SQLExecutionMonitor.java new file mode 100644 index 00000000..bcd4360e --- /dev/null +++ b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/SQLExecutionMonitor.java @@ -0,0 +1,237 @@ +/*- + * ============LICENSE_START======================================================= + * onap + * ================================================================================ + * Copyright (C) 2016 - 2017 ONAP + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.core.dblib.pm; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.Observable; +import java.util.Observer; +import java.util.SortedSet; +import java.util.Timer; +import java.util.TimerTask; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicLong; + +import org.onap.ccsdk.sli.core.dblib.DBResourceObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SQLExecutionMonitor extends Observable +{ + private static Logger LOGGER = LoggerFactory.getLogger(SQLExecutionMonitor.class); + + final static long MILISECOND = 1000000L; + final static long SECOND = 1000L*MILISECOND; + + private final Timer timer; + // collection + private final SortedSet<TestObject> innerSet; + private SQLExecutionMonitorObserver parent = null; + private final AtomicLong completionCounter; + private boolean activeState = false; + private final long interval; + private final long initialDelay; + private final long EXPECTED_TIME_TO_COMPLETE; + private final long UNPROCESSED_FAILOVER_THRESHOLD; + + private final class MonitoringTask extends TimerTask + { + + public void run() + { + try { + TestObject testObj = new TestObject(); + testObj.setStartTime(testObj.getStartTime() - EXPECTED_TIME_TO_COMPLETE); + + // take a snapshot of the current task list + TestObject[] array = innerSet.toArray(new TestObject[0]); + SortedSet<TestObject> copyCurrent = new TreeSet<TestObject>(Arrays.asList(array)); + // get the list of the tasks that are older than the specified + // interval. + SortedSet<TestObject> unprocessed = copyCurrent.headSet(testObj); + + long succesfulCount = completionCounter.get(); + int unprocessedCount = unprocessed.size(); + + if (!unprocessed.isEmpty() && unprocessedCount > UNPROCESSED_FAILOVER_THRESHOLD && succesfulCount == 0) + { + // switch the Connection Pool to passive + setChanged(); + notifyObservers("Open JDBC requests=" + unprocessedCount+" in "+SQLExecutionMonitor.this.parent.getDbConnectionName()); + } + } catch (Exception exc) { + LOGGER.error("", exc); + } finally { + completionCounter.set(0L); + } + } + } + + public static class TestObject implements Comparable<TestObject>, Serializable + { + + private static final long serialVersionUID = 1L; + private long starttime; + private long randId; + + public TestObject() + { + starttime = System.nanoTime(); + } + + public long getStartTime() + { + return starttime; + } + + public void setStartTime(long newTime) + { + starttime = newTime; + } + + public int compareTo(TestObject o) + { + if( this == o) + return 0; + if(this.starttime > o.getStartTime()) + return 1; + if(this.starttime < o.getStartTime()) + return -1; + + if(this.hashCode() > o.hashCode()) + return 1; + if(this.hashCode() < o.hashCode()) + return -1; + + return 0; + } + + public String toString() + { + return Long.toString(starttime)+"#"+ this.hashCode(); + } + + public boolean equals(Object obj) + { + if (this == obj) + return true; + + return (obj instanceof TestObject + && starttime == ((TestObject) obj).getStartTime() + && hashCode() == ((TestObject) obj).hashCode()); + } + } + + public SQLExecutionMonitor(SQLExecutionMonitorObserver parent) + { + this.parent = parent; + completionCounter = new AtomicLong(0L); + interval = parent.getInterval(); + initialDelay = parent.getInitialDelay(); + this.UNPROCESSED_FAILOVER_THRESHOLD = parent.getUnprocessedFailoverThreshold(); + this.EXPECTED_TIME_TO_COMPLETE = parent.getExpectedCompletionTime()*MILISECOND; + + innerSet = Collections.synchronizedSortedSet(new TreeSet<TestObject>()); + timer = new Timer(); + } + + public void cleanup() + { + timer.cancel(); + } + + // registerRequest + public TestObject registerRequest() + { + if(activeState) + { + TestObject test = new TestObject(); + if(innerSet.add(test)) + return test; + } + return null; + } + + // deregisterSuccessfulReguest + public boolean deregisterReguest(TestObject test) + { + if(test == null) + return false; + // remove from the collection + if(innerSet.remove(test) && activeState) + { + completionCounter.incrementAndGet(); + return true; + } + return false; + } + + public void terminate() { + timer.cancel(); + } + + /** + * @return the parent + */ + public final Object getParent() { + return parent; + } + + public void addObserver(Observer observer) + { + if(observer instanceof DBResourceObserver) + { + DBResourceObserver dbObserver = (DBResourceObserver)observer; + if(dbObserver.isMonitorDbResponse()) + { + if(countObservers() == 0) + { + TimerTask remindTask = new MonitoringTask(); + timer.schedule(remindTask, initialDelay, interval); + activeState = true; + } + } + } + super.addObserver(observer); + } + + public void deleteObserver(Observer observer) + { + super.deleteObserver(observer); + if(observer instanceof DBResourceObserver) + { + DBResourceObserver dbObserver = (DBResourceObserver)observer; + if(dbObserver.isMonitorDbResponse()) + { + if(countObservers() == 0) + { + timer.cancel(); + activeState = false; + } + } + } + } + + public final int getPorcessedConnectionsCount() { + return innerSet.size(); + } +} diff --git a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/SQLExecutionMonitorObserver.java b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/SQLExecutionMonitorObserver.java new file mode 100644 index 00000000..c17696a8 --- /dev/null +++ b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/SQLExecutionMonitorObserver.java @@ -0,0 +1,37 @@ +/*- + * ============LICENSE_START======================================================= + * onap + * ================================================================================ + * Copyright (C) 2016 - 2017 ONAP + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.core.dblib.pm; + +public interface SQLExecutionMonitorObserver { + public String getDbConnectionName(); + + public long getInterval(); + public void setInterval(long value); + + public long getInitialDelay(); + public void setInitialDelay(long value); + + public long getExpectedCompletionTime(); + public void setExpectedCompletionTime(long value); + + public long getUnprocessedFailoverThreshold(); + public void setUnprocessedFailoverThreshold(long value); +} |