aboutsummaryrefslogtreecommitdiffstats
path: root/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm
diff options
context:
space:
mode:
authorDan Timoney <dtimoney@att.com>2017-07-27 16:32:20 -0400
committerDan Timoney <dtimoney@att.com>2017-08-01 10:42:39 -0400
commit32f16144e17d2df0831f14d9e65a83756a6ef844 (patch)
tree4f7c5f04c3e4216b1821648cc67b17720b34a8a8 /dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm
parentc95543bf425024f5bd4a127422251b72e408ce2b (diff)
Refactor dblib
Changed openecomp references in dblib, filters and sli to onap. Note: these must be committed together to get a clean compile. Issue: CCSDK-11 Change-Id: Ibe0f64fb20f3ae9cdda2f7ea969ca722bbde0d15 Signed-off-by: Dan Timoney <dtimoney@att.com>
Diffstat (limited to 'dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm')
-rw-r--r--dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/PollingWorker.java217
-rw-r--r--dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/SQLExecutionMonitor.java237
-rw-r--r--dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/SQLExecutionMonitorObserver.java37
3 files changed, 491 insertions, 0 deletions
diff --git a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/PollingWorker.java b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/PollingWorker.java
new file mode 100644
index 00000000..b670a832
--- /dev/null
+++ b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/PollingWorker.java
@@ -0,0 +1,217 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * onap
+ * ================================================================================
+ * Copyright (C) 2016 - 2017 ONAP
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.core.dblib.pm;
+
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PollingWorker implements Runnable {
+
+ private Logger LOGGER = LoggerFactory.getLogger(PollingWorker.class);
+
+ private static PollingWorker self = null;
+
+ private LinkedBlockingQueue tasks = new LinkedBlockingQueue(100);
+ private long interval = 1000L;
+ private Thread worker = null;
+ private AtomicLong[] counters = null;
+ private int[] bucketUnit = null;
+ private static boolean enabled = false;
+ private Timer timer = null;
+
+ public static void post(long starttime){
+ PollingWorker temp = self;
+ if(temp != null && enabled) {
+ temp.register(new TestSample(starttime));
+ }
+ }
+
+ public static void createInistance(Properties props){
+ self = new PollingWorker(props);
+ }
+
+ private PollingWorker(Properties ctxprops){
+ if(ctxprops==null || ctxprops.getProperty("org.onap.ccsdk.dblib.pm") == null){
+ enabled = false;
+ } else {
+ if("true".equalsIgnoreCase((String)ctxprops.getProperty("org.onap.ccsdk.dblib.pm"))){
+ enabled = true;
+ } else {
+ enabled = false;
+ }
+ }
+
+ interval = Long.parseLong(( ctxprops == null || ctxprops.getProperty("org.onap.ccsdk.dblib.pm.interval") == null) ? "60" : (String)ctxprops.getProperty("org.onap.ccsdk.dblib.pm.interval"));
+ // '0' bucket is to count exceptions
+ String sampling[] = ((ctxprops == null || ctxprops.getProperty("org.onap.ccsdk.dblib.pm.sampling")==null) ? "0,2,5,10,20,50,100" : (String)ctxprops.getProperty("org.onap.ccsdk.dblib.pm.sampling")).split(",");
+
+ if(enabled){
+ bucketUnit = new int[sampling.length];
+ for(int i=0, max = bucketUnit.length; i<max; i++){
+ bucketUnit[i] = Integer.parseInt(sampling[i].trim());
+ }
+ counters = new AtomicLong[bucketUnit.length+1];
+ for(int i=0, max = counters.length; i<max; i++){
+ counters[i] = new AtomicLong();
+ }
+ worker = new Thread(this);
+ worker.setDaemon(true);
+ worker.start();
+ timer = new Timer(true);
+ timer.schedule(new MyTimerTask(), interval*1000L, interval*1000L);
+ }
+ }
+
+ private void register(TestSample object){
+ try {
+ tasks.add(object);
+ } catch(Throwable exc) {
+ // if cannot add an object to the queue, do nothing
+ }
+ }
+
+ private void deRegister(TestSample object){
+ tasks.remove(object);
+ }
+
+ public void run() {
+ for(;;){
+ Set data = new TreeSet();
+ tasks.drainTo(data);
+ for(Iterator it = data.iterator(); it.hasNext(); ){
+ Object next = it.next();
+ if(next instanceof TestSample){
+ consume((TestSample)next);
+ } else {
+ System.out.println(next.getClass().getName());
+ }
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+ public void clearReqister(){
+ AtomicLong[] tmp = new AtomicLong[counters.length];
+ for(int i=0, max = tmp.length; i<max; i++){
+ tmp[i] = new AtomicLong();
+ }
+ AtomicLong[] tmp2 = counters;
+ synchronized(tmp2){
+ counters = tmp;
+ }
+ StringBuffer sb = new StringBuffer("CPM: ");
+ for(int i=0, max = tmp2.length; i < max; i++){
+ if(i==0 && bucketUnit[0]==0){
+ sb.append("[Exc]=");
+ } else {
+ sb.append("[");
+ if(i==bucketUnit.length){
+ sb.append("Other]=");
+ } else {
+ sb.append(bucketUnit[i]).append(" ms]=");
+ }
+ }
+ sb.append(tmp2[i].get()).append("\t");
+ }
+ LOGGER.info(sb.toString());
+ }
+
+ class MyTimerTask extends TimerTask{
+
+ public void run() {
+
+ clearReqister();
+ }
+
+ }
+
+ private void consume(TestSample probe) {
+ AtomicLong[] tmp = counters;
+ synchronized(tmp){
+ counters[getBucket(probe.getDuration())].incrementAndGet();
+ }
+ }
+
+ /*
+ * This method is used to find the offset of the bucket in
+ * counters. 'counters' array is 1 size longer than bucketUnit,
+ * hence by default it returns 'bucketUnit.length'
+ */
+ private int getBucket(long difftime){
+ for(int i=0; i<bucketUnit.length; i++){
+ if(difftime < bucketUnit[i]){
+ return i;
+ }
+ }
+ return bucketUnit.length;
+ }
+
+ private static boolean isEnabled() {
+ return enabled;
+ }
+ /**
+ * @author Rich Tabedzki
+ * A helper class to pass measured parameter to the counter.
+ */
+ static class TestSample implements Comparable{
+ private long starttime;
+ private long endtime;
+
+ public TestSample(long starttime) {
+ this.endtime = System.currentTimeMillis();
+ this.starttime = starttime;
+ }
+
+ public long getDuration(){
+ return endtime - starttime;
+ }
+
+ public int compareTo(Object o) {
+ if(o instanceof TestSample){
+ TestSample x = (TestSample)o;
+ if(starttime < x.starttime)
+ return 1;
+ if(endtime < x.endtime)
+ return 1;
+ if(starttime > x.starttime)
+ return -1;
+ if(endtime > x.endtime)
+ return -1;
+ return 0;
+ }
+ return 1;
+ }
+ }
+}
diff --git a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/SQLExecutionMonitor.java b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/SQLExecutionMonitor.java
new file mode 100644
index 00000000..bcd4360e
--- /dev/null
+++ b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/SQLExecutionMonitor.java
@@ -0,0 +1,237 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * onap
+ * ================================================================================
+ * Copyright (C) 2016 - 2017 ONAP
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.core.dblib.pm;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.SortedSet;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.onap.ccsdk.sli.core.dblib.DBResourceObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SQLExecutionMonitor extends Observable
+{
+ private static Logger LOGGER = LoggerFactory.getLogger(SQLExecutionMonitor.class);
+
+ final static long MILISECOND = 1000000L;
+ final static long SECOND = 1000L*MILISECOND;
+
+ private final Timer timer;
+ // collection
+ private final SortedSet<TestObject> innerSet;
+ private SQLExecutionMonitorObserver parent = null;
+ private final AtomicLong completionCounter;
+ private boolean activeState = false;
+ private final long interval;
+ private final long initialDelay;
+ private final long EXPECTED_TIME_TO_COMPLETE;
+ private final long UNPROCESSED_FAILOVER_THRESHOLD;
+
+ private final class MonitoringTask extends TimerTask
+ {
+
+ public void run()
+ {
+ try {
+ TestObject testObj = new TestObject();
+ testObj.setStartTime(testObj.getStartTime() - EXPECTED_TIME_TO_COMPLETE);
+
+ // take a snapshot of the current task list
+ TestObject[] array = innerSet.toArray(new TestObject[0]);
+ SortedSet<TestObject> copyCurrent = new TreeSet<TestObject>(Arrays.asList(array));
+ // get the list of the tasks that are older than the specified
+ // interval.
+ SortedSet<TestObject> unprocessed = copyCurrent.headSet(testObj);
+
+ long succesfulCount = completionCounter.get();
+ int unprocessedCount = unprocessed.size();
+
+ if (!unprocessed.isEmpty() && unprocessedCount > UNPROCESSED_FAILOVER_THRESHOLD && succesfulCount == 0)
+ {
+ // switch the Connection Pool to passive
+ setChanged();
+ notifyObservers("Open JDBC requests=" + unprocessedCount+" in "+SQLExecutionMonitor.this.parent.getDbConnectionName());
+ }
+ } catch (Exception exc) {
+ LOGGER.error("", exc);
+ } finally {
+ completionCounter.set(0L);
+ }
+ }
+ }
+
+ public static class TestObject implements Comparable<TestObject>, Serializable
+ {
+
+ private static final long serialVersionUID = 1L;
+ private long starttime;
+ private long randId;
+
+ public TestObject()
+ {
+ starttime = System.nanoTime();
+ }
+
+ public long getStartTime()
+ {
+ return starttime;
+ }
+
+ public void setStartTime(long newTime)
+ {
+ starttime = newTime;
+ }
+
+ public int compareTo(TestObject o)
+ {
+ if( this == o)
+ return 0;
+ if(this.starttime > o.getStartTime())
+ return 1;
+ if(this.starttime < o.getStartTime())
+ return -1;
+
+ if(this.hashCode() > o.hashCode())
+ return 1;
+ if(this.hashCode() < o.hashCode())
+ return -1;
+
+ return 0;
+ }
+
+ public String toString()
+ {
+ return Long.toString(starttime)+"#"+ this.hashCode();
+ }
+
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+
+ return (obj instanceof TestObject
+ && starttime == ((TestObject) obj).getStartTime()
+ && hashCode() == ((TestObject) obj).hashCode());
+ }
+ }
+
+ public SQLExecutionMonitor(SQLExecutionMonitorObserver parent)
+ {
+ this.parent = parent;
+ completionCounter = new AtomicLong(0L);
+ interval = parent.getInterval();
+ initialDelay = parent.getInitialDelay();
+ this.UNPROCESSED_FAILOVER_THRESHOLD = parent.getUnprocessedFailoverThreshold();
+ this.EXPECTED_TIME_TO_COMPLETE = parent.getExpectedCompletionTime()*MILISECOND;
+
+ innerSet = Collections.synchronizedSortedSet(new TreeSet<TestObject>());
+ timer = new Timer();
+ }
+
+ public void cleanup()
+ {
+ timer.cancel();
+ }
+
+ // registerRequest
+ public TestObject registerRequest()
+ {
+ if(activeState)
+ {
+ TestObject test = new TestObject();
+ if(innerSet.add(test))
+ return test;
+ }
+ return null;
+ }
+
+ // deregisterSuccessfulReguest
+ public boolean deregisterReguest(TestObject test)
+ {
+ if(test == null)
+ return false;
+ // remove from the collection
+ if(innerSet.remove(test) && activeState)
+ {
+ completionCounter.incrementAndGet();
+ return true;
+ }
+ return false;
+ }
+
+ public void terminate() {
+ timer.cancel();
+ }
+
+ /**
+ * @return the parent
+ */
+ public final Object getParent() {
+ return parent;
+ }
+
+ public void addObserver(Observer observer)
+ {
+ if(observer instanceof DBResourceObserver)
+ {
+ DBResourceObserver dbObserver = (DBResourceObserver)observer;
+ if(dbObserver.isMonitorDbResponse())
+ {
+ if(countObservers() == 0)
+ {
+ TimerTask remindTask = new MonitoringTask();
+ timer.schedule(remindTask, initialDelay, interval);
+ activeState = true;
+ }
+ }
+ }
+ super.addObserver(observer);
+ }
+
+ public void deleteObserver(Observer observer)
+ {
+ super.deleteObserver(observer);
+ if(observer instanceof DBResourceObserver)
+ {
+ DBResourceObserver dbObserver = (DBResourceObserver)observer;
+ if(dbObserver.isMonitorDbResponse())
+ {
+ if(countObservers() == 0)
+ {
+ timer.cancel();
+ activeState = false;
+ }
+ }
+ }
+ }
+
+ public final int getPorcessedConnectionsCount() {
+ return innerSet.size();
+ }
+}
diff --git a/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/SQLExecutionMonitorObserver.java b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/SQLExecutionMonitorObserver.java
new file mode 100644
index 00000000..c17696a8
--- /dev/null
+++ b/dblib/provider/src/main/java/org/onap/ccsdk/sli/core/dblib/pm/SQLExecutionMonitorObserver.java
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * onap
+ * ================================================================================
+ * Copyright (C) 2016 - 2017 ONAP
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.core.dblib.pm;
+
+public interface SQLExecutionMonitorObserver {
+ public String getDbConnectionName();
+
+ public long getInterval();
+ public void setInterval(long value);
+
+ public long getInitialDelay();
+ public void setInitialDelay(long value);
+
+ public long getExpectedCompletionTime();
+ public void setExpectedCompletionTime(long value);
+
+ public long getUnprocessedFailoverThreshold();
+ public void setUnprocessedFailoverThreshold(long value);
+}