diff options
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/sonhms/child/ChildThread.java')
-rw-r--r-- | src/main/java/org/onap/dcaegen2/services/sonhms/child/ChildThread.java | 245 |
1 files changed, 245 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/child/ChildThread.java b/src/main/java/org/onap/dcaegen2/services/sonhms/child/ChildThread.java new file mode 100644 index 0000000..eff7a84 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/sonhms/child/ChildThread.java @@ -0,0 +1,245 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * son-handler + * ================================================================================ + * Copyright (C) 2019 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.dcaegen2.services.sonhms.child; + +import org.onap.dcaegen2.services.sonhms.Configuration; +import org.onap.dcaegen2.services.sonhms.dao.ClusterDetailsRepository; +import org.onap.dcaegen2.services.sonhms.model.FapServiceList; +import org.onap.dcaegen2.services.sonhms.model.ThreadId; +import org.onap.dcaegen2.services.sonhms.restclient.AsyncResponseBody; +import org.onap.dcaegen2.services.sonhms.utils.BeanUtil; +import org.onap.dcaegen2.services.sonhms.utils.ClusterUtils; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.slf4j.Logger; +import org.slf4j.MDC; + + +public class ChildThread implements Runnable { + + private BlockingQueue<List<String>> childStatusUpdate; + private BlockingQueue<FapServiceList> queue = new LinkedBlockingQueue<>(); + + private static Map<Long, AsyncResponseBody> responseMap = new HashMap<>(); + private Graph cluster; + private ThreadId threadId; + FapServiceList fapServiceList = new FapServiceList(); + private static final Logger log = org.slf4j.LoggerFactory.getLogger(ChildThread.class); + + /** + * Constructor with parameters. + */ + public ChildThread(BlockingQueue<List<String>> childStatusUpdate, Graph cluster, + BlockingQueue<FapServiceList> queue, ThreadId threadId) { + super(); + this.childStatusUpdate = childStatusUpdate; + this.queue = queue; + this.threadId = threadId; + this.cluster = cluster; + } + + /** + * Puts notification in queue. + */ + public void putInQueue(FapServiceList fapserviceList) { + try { + queue.put(fapserviceList); + } catch (InterruptedException e) { + log.error(" The Thread is Interrupted", e); + Thread.currentThread().interrupt(); + } + } + + /** + * Puts notification in queue with notify. + */ + public void putInQueueWithNotify(FapServiceList fapserviceList) { + synchronized (queue) { + try { + queue.put(fapserviceList); + queue.notifyAll(); + } catch (InterruptedException e) { + log.error(" The Thread is Interrupted", e); + Thread.currentThread().interrupt(); + } + + } + + } + + /** + * Puts response in queue. + */ + public static void putResponse(Long threadId, AsyncResponseBody obj) { + synchronized (responseMap) { + responseMap.put(threadId, obj); + } + + } + + public static Map<Long, AsyncResponseBody> getResponseMap() { + return responseMap; + } + + @Override + public void run() { + + threadId.setChildThreadId(Thread.currentThread().getId()); + synchronized (threadId) { + threadId.notifyAll(); + } + + MDC.put("logFileName", Thread.currentThread().getName()); + log.debug("Starting child thread"); + + try { + fapServiceList = queue.take(); + if (log.isDebugEnabled()) { + log.debug("fapServicelist: {}", fapServiceList); + } + } catch (InterruptedException e1) { + log.error("InterruptedException is {}", e1); + Thread.currentThread().interrupt(); + } + + ClusterFormation clusterFormation = new ClusterFormation(queue); + StateOof oof = new StateOof(childStatusUpdate); + ClusterUtils clusterUtils = new ClusterUtils(); + Detection detect = new Detection(); + + try { + String networkId = fapServiceList.getCellConfig().getLte().getRan().getNeighborListInUse() + .getLteNeighborListInUseLteCell().get(0).getPlmnid(); + + Boolean done = false; + + while (!done) { + + Map<String, ArrayList<Integer>> collisionConfusionResult = detect.detectCollisionConfusion(cluster); + Boolean trigger = clusterFormation.triggerOrWait(collisionConfusionResult); + + if (!trigger) { + collisionConfusionResult = clusterFormation.waitForNotification(collisionConfusionResult, cluster); + } + oof.triggerOof(collisionConfusionResult, networkId); + + if (isNotificationsBuffered()) { + List<FapServiceList> fapServiceLists = bufferNotification(); + for (FapServiceList fapService : fapServiceLists) { + cluster = clusterUtils.modifyCluster(cluster, fapService); + } + String cellPciNeighbourString = cluster.getPciNeighbourJson(); + UUID clusterId = cluster.getGraphId(); + ClusterDetailsRepository clusterDetailsRepository = BeanUtil + .getBean(ClusterDetailsRepository.class); + clusterDetailsRepository.updateCluster(cellPciNeighbourString, clusterId.toString()); + + } else { + done = true; + } + + } + + } catch (Exception e) { + log.error("{}", e); + } + + cleanup(); + } + + private boolean isNotificationsBuffered() { + synchronized (queue) { + + try { + while (queue.isEmpty()) { + queue.wait(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + return true; + } + + /** + * cleanup resources. + */ + private void cleanup() { + log.debug("cleaning up database and killing child thread"); + ClusterDetailsRepository clusterDetailsRepository = BeanUtil.getBean(ClusterDetailsRepository.class); + clusterDetailsRepository.deleteByChildThreadId(threadId.getChildThreadId()); + log.debug("Child thread :{} {}", Thread.currentThread().getId(), "completed"); + MDC.remove("logFileName"); + + } + + /** + * Buffer Notification. + */ + public List<FapServiceList> bufferNotification() { + + // Processing Buffered notifications + + List<FapServiceList> fapServiceLists = new ArrayList<>(); + + Configuration config = Configuration.getInstance(); + + int bufferTime = config.getBufferTime(); + + Timestamp currentTime = new Timestamp(System.currentTimeMillis()); + log.debug("Current time {}", currentTime); + + Timestamp laterTime = new Timestamp(System.currentTimeMillis()); + log.debug("Later time {}", laterTime); + + long difference = laterTime.getTime() - currentTime.getTime(); + while (difference < bufferTime) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + log.error("InterruptedException {}", e); + Thread.currentThread().interrupt(); + + } + laterTime = new Timestamp(System.currentTimeMillis()); + difference = laterTime.getTime() - currentTime.getTime(); + + log.debug("Timer has run for seconds {}", difference); + + if (!queue.isEmpty()) { + FapServiceList fapService; + fapService = queue.poll(); + fapServiceLists.add(fapService); + } + } + return fapServiceLists; + } + +} |