diff options
Diffstat (limited to 'lib/network-prioritization/src/main')
13 files changed, 1576 insertions, 0 deletions
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/NpmConstants.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/NpmConstants.java new file mode 100644 index 000000000..0583f60f7 --- /dev/null +++ b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/NpmConstants.java @@ -0,0 +1,36 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================= + * + */ + +package org.onap.ccsdk.features.lib.npm; + +public class NpmConstants { + public static final String PROPERTY_ENV_TYPE = "Env_Type"; + public static final String PROPERTY_ENV_PROD = "field"; + public static final String PROPERTY_ENV_SOLO = "solo"; + + public static final String MDC_REQUEST_ID = "RequestID"; + + public static final String SDNC_CONFIG_DIR = "SDNC_CONFIG_DIR"; + public static final String DEFAULT_SDNC_CONFIG_DIR = "/opt/sdnc/data/properties"; + public static final String NPM_CONFIG_PROPERTIES_FILE_NAME = "npm-config.properties"; + + private NpmConstants() {} +} diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/NpmException.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/NpmException.java new file mode 100644 index 000000000..ec0f1995c --- /dev/null +++ b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/NpmException.java @@ -0,0 +1,56 @@ +/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm;
+
+/**
+ * The type Npm exception.
+ * <p>
+ * It will be thrown in following cases:
+ * - Invalid Npm Transaction received
+ * - Couldn't queue RECEIVED Npm Transaction
+ * - Npm Transaction is already present in queue
+ * - Fails to invoke Service callback API
+ *
+ * @author Kapil Singal
+ */
+public class NpmException extends Exception {
+
+ /**
+ * This is a NpmException constructor
+ *
+ * @param message the message
+ */
+ public NpmException(String message) {
+ super(message);
+ }
+
+ /**
+ * This is a NpmException constructor
+ *
+ * @param message the message
+ * @param cause the cause
+ */
+ public NpmException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceCallbackApi.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceCallbackApi.java new file mode 100644 index 000000000..58e8cd550 --- /dev/null +++ b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceCallbackApi.java @@ -0,0 +1,45 @@ +/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.api;
+
+import org.onap.ccsdk.features.lib.npm.NpmException;
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;
+
+/**
+ * The interface NpmServiceCallbackApi.
+ * This must be implemented by all Services to receive notification back
+ *
+ * @author Kapil Singal
+ */
+public interface NpmServiceCallbackApi {
+
+ /**
+ * Process to be implemented by all services on-boarding to Npm.
+ * This API will be invoked by Npm to notify Service to process a transaction
+ *
+ * @param npmTransaction the NpmTransaction
+ *
+ * @throws NpmException the npm exception
+ */
+ void process(NpmTransaction npmTransaction) throws NpmException;
+
+}
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceManager.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceManager.java new file mode 100644 index 000000000..0b7ab7246 --- /dev/null +++ b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceManager.java @@ -0,0 +1,122 @@ +/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.api;
+
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import org.onap.ccsdk.features.lib.npm.NpmException;
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;
+
+/**
+ * The interface Npm Service Manager being used internally by Npm
+ * <p>
+ * Placeholder for Priority Queues holding Npm Transactions:
+ * Create: During Npm Startup if un-processed Transaction are available in NPM_TRANSACTION Table
+ * Manage: If a tx is expired sitting in queue, invoke service callback api
+ * Remove: If gets notified by a service once done with processing Transaction
+ *
+ * @author Kapil Singal
+ */
+public interface NpmServiceManager {
+
+ /**
+ * Add Transaction to queue.
+ *
+ * @param npmTransaction the NpmTransaction instance which contain serviceRequest with header information
+ *
+ * @throws NpmException the NpmException if Npm Transaction:
+ * if missing required header information
+ * couldn't be queued if priority queue is already full to it's max capacity
+ */
+ void addTransactionToQueue(NpmTransaction npmTransaction) throws NpmException;
+
+ /**
+ * Remove Transaction from queue and update connection counter
+ *
+ * @param npmTransaction the NpmTransaction instance which contain serviceRequest with header information
+ * @param updateConnectionCounter the update connection counter only if it's true
+ */
+ void removeTransactionFromQueue(NpmTransaction npmTransaction, boolean updateConnectionCounter);
+
+ /**
+ * Retrieve transaction from queue list.
+ *
+ * @param sbEndpoint the sb endpoint
+ * @param sbType the sb type
+ *
+ * @return the list of NpmTransaction
+ */
+ List<NpmTransaction> retrieveTransactionFromQueue(String sbEndpoint, String sbType);
+
+ /**
+ * Retrieve all priority queues map.
+ *
+ * @return the map
+ * <pre>
+ * npmPriorityQueues : Map [String:sbEndpoint##sbType, Map:sbPriorityQueues]
+ * sbPriorityQueues : Map [int:priority, TreeSet:priorityQueue]
+ * priorityQueue : NavigableSet [Contains the NpmTransaction Object]
+ * </pre>
+ */
+ Map<String, Map<Integer, NavigableSet<NpmTransaction>>> retrieveNpmPriorityQueues();
+
+ /**
+ * Load npm config boolean.
+ * <pre>
+ * Default properties:
+ * <b>Priority_List=0,1,2</b> Total possible priorities (lowest index is the highest priority)
+ * <b>Default_Priority=2</b> Default Priority value to be set if missing in request
+ * <b>EMS_ERICSSON=2</b> Maximum number of parallel connection that ERICSSON manufactured EMS can support
+ * <b>EMS_NOKIA=2</b> Maximum number of parallel connection that NOKIA manufactured EMS can support
+ * <b>queue_capacity_0=10</b> Total capacity of queue (maximum number of transactions a queue can hold) with priority 0
+ * <b>queue_capacity_1=7</b> Total capacity of queue (maximum number of transactions a queue can hold) with priority 1
+ * <b>queue_capacity_2=5</b> Total capacity of queue (maximum number of transactions a queue can hold) with priority 2
+ * <b>qsp_limit_0=5</b> Maximum number of transactions that can be processed from a queue before jumping to lower priority queues
+ * <b>qsp_limit_1=3</b> Maximum number of transactions that can be processed from a queue before jumping to lower priority queues
+ * <b>qsp_limit_2=2</b> Maximum number of transactions that can be processed from a queue before jumping to lower priority queues
+ * </pre>
+ *
+ * @param configFilePath the Config File Name
+ *
+ * @throws NpmException the npm exception
+ */
+ void loadNpmConfig(String configFilePath) throws NpmException;
+
+ /**
+ * Gets npm config.
+ *
+ * @param referenceKey the reference key
+ *
+ * @return the npmConfig Value
+ */
+ String getNpmConfig(String referenceKey);
+
+ /**
+ * Register service : must be called to register with Npm
+ *
+ * @param serviceKey the unique serviceKey specific to service reference
+ * @param npmServiceCallbackApi the instance of service class implementing NpmServiceCallbackApi
+ */
+ void registerService(String serviceKey, NpmServiceCallbackApi npmServiceCallbackApi);
+
+}
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceManagerImpl.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceManagerImpl.java new file mode 100644 index 000000000..2cdef3537 --- /dev/null +++ b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceManagerImpl.java @@ -0,0 +1,430 @@ +/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.api;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.onap.ccsdk.features.lib.npm.NpmConstants;
+import org.onap.ccsdk.features.lib.npm.NpmException;
+import org.onap.ccsdk.features.lib.npm.models.NpmStatusEnum;
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;
+import org.onap.ccsdk.features.lib.npm.utils.NpmUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.MDC;
+
+import static org.onap.ccsdk.features.lib.npm.NpmConstants.MDC_REQUEST_ID;
+
+/**
+ * The type Npm Service Manager.
+ *
+ * @author Kapil Singal
+ */
+public class NpmServiceManagerImpl implements NpmServiceManager {
+ private static final Logger logger = LoggerFactory.getLogger(NpmServiceManagerImpl.class);
+ /**
+ * Npm `Priority Queues`.
+ * <p>
+ * npmPriorityQueues : Map <String:sbEndpoint##sbType, Map:sbPriorityQueues>
+ * sbPriorityQueues : Map <int:priority, TreeSet:priorityQueue>
+ * priorityQueue : NavigableSet [Contains the NpmTransaction Object]
+ * </p>
+ *
+ * <p>
+ * Npm will maintain multiple Priority queues per sb_endpoint (an EMS is an example of sb_endpoint)
+ * Priority Queues will be maintained per priority, having Npm Transactions, sorted based on timestamp, to accomplish a FIFO queue.
+ * </p>
+ *
+ * <p>
+ * Why NavigableSet or ConcurrentSkipListSet is being used:
+ * Need to maintain priority queue Sorted based om Transaction timestamp.!!
+ * Hence using ConcurrentSkipListSet -> which implements NavigableSet -> which extends SortedSet
+ * </p>
+ *
+ * <p>
+ * The ConcurrentSkipListSet class allows safe execution of
+ * Insertion, removal, and access operations on set concurrently by multiple threads.
+ * </p>
+ *
+ * <p>
+ * It should be preferred over other implementations of the Set interface
+ * when concurrent modification of set by multiple threads is required.
+ * </p>
+ */
+ private final Map<String, Map<Integer, NavigableSet<NpmTransaction>>> npmPriorityQueues = new ConcurrentHashMap<>();
+ private final Map<String, Integer> connectionCounter = new ConcurrentHashMap<>();
+ private final Map<String, NpmServiceCallbackApi> serviceRegistry = new ConcurrentHashMap<>();
+ private final Map<String, Integer> priorityExecState = new ConcurrentHashMap<>();
+ private final Map<String, Integer> qspExecState = new ConcurrentHashMap<>();
+
+ private final Properties npmConfigurations = new Properties();
+ private final ExecutorService executorService = Executors.newCachedThreadPool();
+
+ private boolean isProcessIngNpmPriorityQueues = false;
+
+ public NpmServiceManagerImpl() throws NpmException {
+ loadProperties();
+ Runnable processNpmPriorityQueuesTask = () -> {
+ try {
+ if (!isProcessIngNpmPriorityQueues) {
+ isProcessIngNpmPriorityQueues = true;
+ // Cleaning up MDC to make sure logging doesn't have old requestID being used for further processing
+ MDC.clear();
+ processNpmPriorityQueues();
+ isProcessIngNpmPriorityQueues = false;
+ }
+ } catch (StackOverflowError | Exception e) {
+ //Setting isProcessIngNpmPriorityQueues to false because next time when periodic task runs it should re-run processNpmPriorityQueues
+ isProcessIngNpmPriorityQueues = false;
+ // Catching both as there may not be any npm transaction at time of boot or eventual
+ logger.warn("----------- Task to processNpmPriorityQueues failed ----------- \nErrorMessage:({})", e.getMessage(), e);
+ }
+ };
+ Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(processNpmPriorityQueuesTask, 30, 5, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public String getNpmConfig(String referenceKey) {
+ return npmConfigurations.getProperty(referenceKey);
+ }
+
+ @Override
+ public void registerService(String serviceKey, NpmServiceCallbackApi npmServiceCallbackApi) {
+ logger.trace("------------- Registering NpmServiceCallbackApi with serviceKey:({}) -------------", serviceKey);
+ serviceRegistry.put(serviceKey, npmServiceCallbackApi);
+ }
+
+ @Override
+ public void addTransactionToQueue(final NpmTransaction npmTransaction) throws NpmException {
+ logger.trace("------------- Inside NPM SM addTransactionToQueue -------------");
+ logger.trace("Queuing Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());
+
+ //Sorted Queue based on timestamp, if timestamp same for multiple Transaction it sorts those with NpmTransactionId.
+ //Using computeIfAbsent to make sure it creates priority_queue for particular sb_endpoint if not already present
+ final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(npmTransaction.getSbEndpoint(), npmTransaction.getSbType());
+
+ logger.trace("Locating npmPriorityQueues with key sbEndpoint##sbType :: ({})", npmPriorityQueueKey);
+ NavigableSet<NpmTransaction> priorityQueue = npmPriorityQueues.computeIfAbsent(npmPriorityQueueKey,
+ sbPriorityQueues -> new TreeMap<>()).computeIfAbsent(npmTransaction.getPriority(),
+ priorityQueueSet -> new ConcurrentSkipListSet<>
+ (Comparator.comparing(NpmTransaction :: getTimestamp).thenComparing(NpmTransaction :: getNpmTransactionId)));
+ logger.trace("Current queue length for sbEndpoint:({}) with priority:({}) is:({})",
+ npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());
+
+ if (priorityQueue.contains(npmTransaction)) {
+ logger.trace("Npm Transaction with npmTransactionId:({}) is already present in queued, returning without altering the queue...",
+ npmTransaction.getNpmTransactionId());
+ return;
+ }
+
+ // Compare if queue_capacity_$priority available from Configurations, else by default it will be comparing against default value = 10
+ final int queueCapacity = NumberUtils.toInt(getNpmConfig("queue_capacity_" + npmTransaction.getPriority()), 10);
+ if (priorityQueue.size() >= queueCapacity) {
+ npmTransaction.setStatus(NpmStatusEnum.OUT_OF_CAPACITY);
+ String message = String.format("Queue %s Error. Npm Queue for sb_endpoint:(%s) with Priority:(%s) is maxed out to it's capacity limit:(%s)",
+ NpmStatusEnum.OUT_OF_CAPACITY, npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), queueCapacity);
+ logger.trace("Returning Error message:({})", message);
+ throw new NpmException(message);
+ }
+ npmTransaction.setStatus(NpmStatusEnum.QUEUED);
+ priorityQueue.add(npmTransaction);
+ logger.trace("Successfully queued Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());
+ logger.trace("Updated queue length for sbEndpoint:({}) with priority:({}) is:({})",
+ npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());
+ }
+
+ @Override
+ public List<NpmTransaction> retrieveTransactionFromQueue(String sbEndpoint, String sbType) {
+ logger.trace("------------- Inside NPM SM retrieveTransactionFromQueue -------------");
+ logger.trace("Retrieving all Npm Transactions for sbEndpoint:sbType ({}:{}) from priorityQueues", sbEndpoint, sbType);
+
+ final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(sbEndpoint, sbType);
+ final List<NpmTransaction> npmTransactionList = new ArrayList<>();
+
+ //Using computeIfPresent as npmTransactionQueueMap doesn't need any alteration if Npm Transaction is not found.
+ npmPriorityQueues.computeIfPresent(npmPriorityQueueKey, (sb, sbPriorityQueues) -> {
+ sbPriorityQueues.forEach((priority, npmTransactionNavigableSet) -> {
+ npmTransactionList.addAll(npmTransactionNavigableSet);
+ });
+ return sbPriorityQueues;
+ });
+
+ logger.trace("Retrieved total {} Npm Transactions from priorityQueues", npmTransactionList.size());
+ return npmTransactionList;
+ }
+
+ @Override
+ public Map<String, Map<Integer, NavigableSet<NpmTransaction>>> retrieveNpmPriorityQueues() {
+ // TODO: Check if it should return the actual queue map instance or a clone !!
+ return npmPriorityQueues;
+ }
+
+ @Override
+ public void removeTransactionFromQueue(NpmTransaction npmTransaction, boolean updateConnectionCounter) {
+ logger.trace("------------- Inside NPM SM removeTransactionFromQueue -------------");
+ logger.trace("Removing Npm Transaction from priority queue with npmTransactionId:({})", npmTransaction.getNpmTransactionId());
+ if (updateConnectionCounter) {
+ // Updating connection counter so that next transaction can be processed from queue for same sbEndpoint
+ updateConnectionCounter(npmTransaction.getSbEndpoint(), Math.negateExact(npmTransaction.getConnectionCount()));
+ }
+ final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(npmTransaction.getSbEndpoint(), npmTransaction.getSbType());
+
+ //Using computeIfPresent as npmTransactionQueueMap doesn't need any alteration if Npm Transaction is not found.
+ npmPriorityQueues.computeIfPresent(npmPriorityQueueKey, (sb, sbPriorityQueues) -> {
+ NavigableSet<NpmTransaction> priorityQueue = sbPriorityQueues.get(npmTransaction.getPriority());
+ if (priorityQueue != null) {
+ logger.trace("Current queue length for sbEndpoint:({}) with priority:({}) is:({})",
+ npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());
+
+ priorityQueue.remove(npmTransaction);
+ logger.trace("Successfully removed Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());
+ logger.trace("Updated queue length for sbEndpoint:({}) with priority:({}) is:({})",
+ npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());
+
+ // Cleaning up Priority Queue if empty
+ if (priorityQueue.isEmpty()) {
+ logger.trace("As priorityQueue for sbEndpoint:({}) with priority:({}) is empty, removing the priority queue.",
+ npmTransaction.getSbEndpoint(), npmTransaction.getPriority());
+ sbPriorityQueues.remove(npmTransaction.getPriority());
+ }
+ }
+ return sbPriorityQueues;
+ });
+ }
+
+ private void processExpiredNpmTransaction() {
+ logger.trace("------------- Inside NPM SM processExpiredNpmTransaction -------------");
+ if (npmPriorityQueues.isEmpty()) {
+ logger.trace("------------- No Priority Queue is present, nothing to cleanup, hence returning -------------");
+ // Returning here itself to avoid StackOverFlow or other runtime error as there may not be any npm transaction to process
+ return;
+ }
+ // Converting to entrySet so that it can parallel stream :)
+ npmPriorityQueues.entrySet().parallelStream().forEach(sbEndpointMapEntry -> {
+ sbEndpointMapEntry.getValue().entrySet().parallelStream().forEach(prioritySetEntry -> {
+
+ // TODO: Need to test, Periodic Printing of Current Queue Length is expected by Rajesh and Team
+ logger.trace("Current queue length with key sbEndpoint##sbType :: ({}) with priority:({}) is:({})",
+ sbEndpointMapEntry.getKey(), prioritySetEntry.getKey(), prioritySetEntry.getValue().size());
+
+ prioritySetEntry.getValue().parallelStream().forEach(npmTransaction -> {
+ // Checking if npmTransaction is already expired
+ if (NpmUtils.isExpired(npmTransaction)) {
+ logger.trace("Npm Transaction with npmTransactionId:({}) is Expired and will be removed from priority queue, as timeToLive has passed.",
+ npmTransaction.getNpmTransactionId());
+ npmTransaction.setStatus(NpmStatusEnum.EXPIRED);
+ npmTransaction.setMessage("Npm Transaction is Expired and will be removed from priority queue, as timeToLive has passed.");
+ Runnable notifyServiceTask = () -> invokeServiceCallbackApi(npmTransaction, false);
+ executorService.execute(notifyServiceTask);
+ removeTransactionFromQueue(npmTransaction, false);
+ }
+ });
+ });
+ });
+ logger.trace("------------- Done with checking all priority queues for any expired Npm Transaction -------------");
+ }
+
+ private void processNpmPriorityQueues() {
+ logger.trace("------------- Inside NPM SM processNpmPriorityQueues -------------");
+ if (npmPriorityQueues.isEmpty()) {
+ // Returning here itself to avoid StackOverFlow or other runtime error as there may not be any npm transaction to process
+ return;
+ }
+ logger.trace("Calling processExpiredNpmTransaction to cleanup expired Npm Transactions before processing any queue.");
+ processExpiredNpmTransaction();
+
+ // Converting to entrySet so that it can parallel stream :)
+ npmPriorityQueues.entrySet().parallelStream().forEach(sbQueueEntry -> {
+ final String sbEndpoint = sbQueueEntry.getKey().split("##")[0];
+ final String sbType = sbQueueEntry.getKey().split("##")[1];
+ final int sbConnectionLimit = NumberUtils.toInt(getNpmConfig(sbType), 1);
+
+ if (sbConnectionLimit <= connectionCounter.getOrDefault(sbEndpoint, 0)) {
+ logger.trace("Not processing any Npm Transaction for sbEndpoint:({}) as it is already occupied to it's maximum connection limit:({}).",
+ sbEndpoint, sbConnectionLimit);
+ //returning when a particular SB (sbEndpoint) is already occupied to it's max limit
+ return;
+ }
+ logger.trace("Trying to process priority queue for sbEndpoint({}) with connectionLimit:({})", sbEndpoint, sbConnectionLimit);
+ processSbPriorityQueues(sbEndpoint, sbType, sbConnectionLimit, sbQueueEntry.getValue());
+ });
+ }
+
+ private void processSbPriorityQueues(String sbEndpoint, String sbType, int sbConnectionLimit, final Map<Integer, NavigableSet<NpmTransaction>> priorityQueues) {
+ logger.trace("------------- Inside NPM SM processSbPriorityQueues -------------");
+ if (priorityQueues == null || priorityQueues.isEmpty()) {
+ // Returning here itself to avoid StackOverFlow or other runtime error as there may not be any npm transaction to process
+ return;
+ }
+
+ Iterator<Map.Entry<Integer,NavigableSet<NpmTransaction>>> priorityQueuesIterator = priorityQueues.entrySet().iterator();
+ while (priorityQueuesIterator.hasNext()) {
+ Map.Entry<Integer,NavigableSet<NpmTransaction>> entry = priorityQueuesIterator.next();
+ final NavigableSet<NpmTransaction> npmTransactions = entry.getValue();
+ if (npmTransactions.isEmpty()) {
+ priorityQueuesIterator.remove();
+ continue;
+ }
+
+ final Integer priorityIndex = entry.getKey();
+ final String qspLimitKey = "qsp_limit_" + priorityIndex;
+ final String qspStateKey = sbEndpoint + "_" + priorityIndex;
+
+ AtomicInteger qspLimit = new AtomicInteger(NumberUtils.toInt(getNpmConfig(qspLimitKey), 5));
+ AtomicInteger qspCounter = new AtomicInteger(qspExecState.getOrDefault(qspStateKey, 0));
+ logger.trace("For sbEndpoint:({}) with priority:({}) qspLimit is:({}) and current qspCounter is:({})",
+ sbEndpoint, priorityIndex, qspLimit.get(), qspCounter.get());
+
+ // On re-iteration it should be processing same priority queue which was processed last only if qsp hasn't met
+ if (NpmUtils.isAllOkToProcess(qspLimit.get(), qspCounter.get(), connectionCounter.getOrDefault(sbEndpoint, 0), sbConnectionLimit)
+ && priorityExecState.containsKey(sbEndpoint) && priorityQueues.containsKey(priorityExecState.get(sbEndpoint))
+ && !priorityIndex.equals(priorityExecState.get(sbEndpoint))) {
+ logger.trace("Last execution state for sbEndpoint:({}) was for priority:({})", sbEndpoint, priorityExecState.get(sbEndpoint));
+ return;
+ }
+
+ logger.trace("------------- Iterating npmTransactions from priorityQueue -------------");
+ for (final NpmTransaction npmTransaction : npmTransactions) {
+ // Setting RequestID in MDC same as NPM Transaction RequestId
+ MDC.put(MDC_REQUEST_ID, npmTransaction.getRequestId());
+ // Should pick npmTransactions which are in QUEUED state and are not Expired
+ if (NpmStatusEnum.QUEUED.equals(npmTransaction.getStatus()) && !NpmUtils.isExpired(npmTransaction)
+ && NpmUtils.isAllOkToProcess(qspLimit.get(), qspCounter.get(), connectionCounter.getOrDefault(sbEndpoint, 0), sbConnectionLimit)
+ && invokeServiceCallbackApi(npmTransaction, true)) {
+
+ logger.trace("------------- Updating priorityExecState and qspExecState -------------");
+ priorityExecState.put(sbEndpoint, priorityIndex);
+ qspExecState.put(qspStateKey, qspCounter.incrementAndGet());
+ logger.trace("Updated priorityExecState for sbEndpoint:({}) with priority:({})", sbEndpoint, priorityIndex);
+ logger.trace("Updated qspExecState for qspStateKey:({}) with qspCounter value:({})", qspStateKey, qspExecState.get(qspStateKey));
+ }
+ }
+ resetExecStates(sbEndpoint, sbType);
+ }
+ }
+
+ private boolean invokeServiceCallbackApi(NpmTransaction npmTransaction, boolean updateConnectionCounter) {
+ logger.trace("------------- Inside NPM SM invokeServiceCallbackApi -------------");
+ try {
+ logger.trace("Notifying Registered Service with serviceKey:({}) to process Npm Transaction with npmTransactionId:({})",
+ npmTransaction.getServiceKey(), npmTransaction.getNpmTransactionId());
+ //Setting the status as PROCESSING so that same won't be picked up again in processNpmPriorityQueues
+ npmTransaction.setStatus(NpmStatusEnum.PROCESSING);
+ serviceRegistry.get(npmTransaction.getServiceKey()).process(npmTransaction);
+ logger.trace("Notified Registered Service to process Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());
+
+ if (updateConnectionCounter) {
+ updateConnectionCounter(npmTransaction.getSbEndpoint(), npmTransaction.getConnectionCount());
+ }
+ } catch (NpmException e) {
+ logger.error("Notifying Registered Service with serviceKey:({}) for npmTransactionId:({}) failed with ErrorMessage:({})",
+ npmTransaction.getServiceKey(), npmTransaction.getNpmTransactionId(), e.getMessage(), e);
+ removeTransactionFromQueue(npmTransaction, true);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Resetting Execution States only if QSP met for all priorities so that Npm can reiterate in Round Robin fashion.
+ */
+ private void resetExecStates(String sbEndpoint, String sbType) {
+ logger.trace("------------- Inside NPM SM resetExecStates -------------");
+ boolean temp = true;
+ final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(sbEndpoint, sbType);
+ for (int priority : NpmUtils.getPriorityList(getNpmConfig("Priority_List"))) {
+ if (npmPriorityQueues.containsKey(npmPriorityQueueKey) && npmPriorityQueues.get(npmPriorityQueueKey).containsKey(priority)
+ && !priorityExecState.containsValue(priority)) {
+ //Setting temp to false so that it won't cleanup priorityExecState and qspExecState as all priorityQueues hasn't been processed yet
+ logger.trace("Execution States won't be resetting for sbEndpoint:({}) as all priorityQueues hasn't been processed yet.", sbEndpoint);
+ temp = false;
+ break;
+ }
+ }
+ if (temp) {
+ for (int priority : NpmUtils.getPriorityList(getNpmConfig("Priority_List"))) {
+ logger.trace("Resetting Execution States for sbEndpoint:({}) as all priorityQueues processed and those needs to reiterate.", sbEndpoint);
+ priorityExecState.remove(sbEndpoint);
+ qspExecState.remove(sbEndpoint + "_" + priority);
+ }
+ }
+ }
+
+ private void updateConnectionCounter(String sbEndpoint, int connectionCounterValue) {
+ logger.trace("------------- Inside NPM SM updateConnectionCounter -------------");
+ //Updating connectionCounter value to be 0 or +ve integer whichever is larger
+ connectionCounter.computeIfPresent(sbEndpoint, (key, value) -> Math.max((value + connectionCounterValue), 0));
+ connectionCounter.computeIfAbsent(sbEndpoint, s -> Math.max(connectionCounterValue, 0));
+ logger.trace("For sbEndpoint:({}) updated connectionCounter value is:({}) ", sbEndpoint, connectionCounter.get(sbEndpoint));
+ }
+
+ private void loadProperties() throws NpmException {
+ logger.trace("------------- Inside NPM SM loadProperties -------------");
+ String propDir = System.getProperty(NpmConstants.SDNC_CONFIG_DIR);
+ if (StringUtils.isBlank(propDir)) {
+ propDir = System.getenv(NpmConstants.SDNC_CONFIG_DIR);
+ }
+ if (StringUtils.isBlank(propDir)) {
+ logger.warn("Environment variable:({}) is not set, defaulting properties directory to:({})",
+ NpmConstants.SDNC_CONFIG_DIR, NpmConstants.DEFAULT_SDNC_CONFIG_DIR);
+ propDir = NpmConstants.DEFAULT_SDNC_CONFIG_DIR;
+ }
+ loadNpmConfig(propDir + File.separator + NpmConstants.NPM_CONFIG_PROPERTIES_FILE_NAME);
+ }
+
+ @Override
+ public void loadNpmConfig(String configFilePath) throws NpmException {
+ logger.trace("------------- Inside NPM SM loadNpmConfig -------------");
+ try {
+ logger.trace("Initializing NPM Configurations from:({})", configFilePath);
+ if (new File(configFilePath).exists()) {
+ npmConfigurations.load(new FileInputStream(configFilePath));
+ } else {
+ logger.warn("Config File:({}) not found, Initializing NPM with default configurations.", configFilePath);
+ configFilePath = "properties" + File.separator + NpmConstants.NPM_CONFIG_PROPERTIES_FILE_NAME;
+ npmConfigurations.load(getClass().getClassLoader().getResourceAsStream(configFilePath));
+ }
+ logger.trace("Initialized NPM with Configurations:({}) from configFilePath:({})", npmConfigurations, configFilePath);
+ } catch (IOException e) {
+ throw new NpmException(String.format("SDN-R Internal Error: Failed to load NPM Configurations form:(%s)", configFilePath), e);
+ }
+ }
+
+}
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionService.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionService.java new file mode 100644 index 000000000..45777b895 --- /dev/null +++ b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionService.java @@ -0,0 +1,158 @@ +/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.api;
+
+import java.util.Map;
+import java.util.NavigableSet;
+import org.onap.ccsdk.features.lib.npm.models.NpmAck;
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * The interface Npm Transaction Service provider.
+ *
+ * @author Kapil Singal
+ */
+public interface NpmTransactionService {
+
+ /**
+ * Form npm transaction npm transaction with all mandatory/non-null parameters.
+ *
+ * @param sbEndpoint the sb endpoint (Host IP Address)
+ * @param sbType the sb type (eg. EMS_ERICSSON, EMS_NOKIA, D2_MSN etc)
+ * @param serviceKey the service key (the unique serviceKey specific to service reference)
+ * @param serviceRequest the service request (actual request payload received from upstream)
+ *
+ * @return the npm transaction
+ */
+ NpmTransaction formNpmTransaction(String sbEndpoint, String sbType, String serviceKey, Object serviceRequest);
+
+ /**
+ * Form npm transaction npm transaction with all NPM header Information (nullable and non-nullable)
+ *
+ * @param npmTransactionId the npmTransactionId (instance of UUID :: defaults to Random UUID if null)
+ * @param sbEndpoint the sbEndpoint (Host IP Address :: used to create priority queues)
+ * @param sbType the sbType (eg. EMS_ERICSSON, EMS_NOKIA, D2_MSN etc :: used to determine parallel connections it can support )
+ * @param priority the priority (n :: lowest index is the highest priority, defaults to (least priority) if -1
+ * @param connectionCount the connectionCount (n :: total number of connection a transaction would occupy while processing defaults to (1) if -1
+ * @param timestamp the timestamp (instance of Instant :: defaults to Current Time if null)
+ * @param timeToLive the timeToLive (instance of Instant :: defaults to MAX Time if null)
+ * @param serviceKey the serviceKey (the unique serviceKey specific to service reference)
+ * @param serviceRequest the serviceRequest (actual request payload received from upstream)
+ *
+ * @return the npm transaction
+ */
+ NpmTransaction formNpmTransaction(UUID npmTransactionId, String sbEndpoint, String sbType, int priority, int connectionCount,
+ Instant timestamp, Instant timeToLive, String serviceKey, Object serviceRequest);
+
+ /**
+ * Add Transactions to queue : called by all services to get Transactions added to respective priority queue
+ *
+ * @param npmTransactionList the NpmTransaction list
+ *
+ * @return the list of NpmAck with status and message
+ */
+ List<NpmAck> addTransactionsToQueue(List<NpmTransaction> npmTransactionList);
+
+ /**
+ * Add Transaction to queue : called by all services to get Transaction added to respective priority queue
+ *
+ * @param npmTransaction the NpmTransaction
+ *
+ * @return the NpmAck with status and message
+ */
+ NpmAck addTransactionsToQueue(NpmTransaction npmTransaction);
+
+ /**
+ * Remove Transactions from queue : called by all services to get Transactions removed from priority queue
+ *
+ * @param npmTransactionList the NpmTransaction list
+ *
+ * @return the list of NpmAck with status and message
+ */
+ List<NpmAck> removeTransactionsFromQueue(List<NpmTransaction> npmTransactionList);
+
+ /**
+ * Remove Transaction from queue : called by all services to get Transaction removed from priority queue
+ *
+ * @param npmTransaction the NpmTransaction
+ *
+ * @return the NpmAck with status and message
+ */
+ NpmAck removeTransactionsFromQueue(NpmTransaction npmTransaction);
+
+ /**
+ * Retrieve transaction from queue list.
+ *
+ * @param sbEndpoint the sb endpoint
+ * @param sbType the sb type
+ *
+ * @return the list
+ */
+ List<NpmTransaction> retrieveTransactionFromQueue(String sbEndpoint, String sbType);
+
+ /**
+ * Retrieve NPM priority queues map.
+ *
+ * @return the map
+ * <pre>
+ * npmPriorityQueues : Map [String:sbEndpoint##sbType, Map:sbPriorityQueues]
+ * sbPriorityQueues : Map [int:priority, TreeSet:priorityQueue]
+ * priorityQueue : NavigableSet [Contains the NpmTransaction Object]
+ * </pre>
+ */
+ Map<String, Map<Integer, NavigableSet<NpmTransaction>>> retrieveNpmPriorityQueues();
+
+ /**
+ * Register service : must be called by all services to register with Npm
+ *
+ * @param serviceKey the unique serviceKey specific to service reference
+ * @param npmServiceCallbackApi the instance of service class implementing NpmServiceCallbackApi
+ */
+ void registerService(String serviceKey, NpmServiceCallbackApi npmServiceCallbackApi);
+
+ /**
+ * Load npm config boolean.
+ * <pre>
+ * Default properties:
+ * <b>Priority_List=0,1,2</b> Total possible priorities (lowest index is the highest priority)
+ * <b>Default_Priority=2</b> Default Priority value to be set if missing in request
+ * <b>EMS_ERICSSON=2</b> Maximum number of parallel connection that ERICSSON manufactured EMS can support
+ * <b>EMS_NOKIA=2</b> Maximum number of parallel connection that NOKIA manufactured EMS can support
+ * <b>queue_capacity_0=10</b> Total capacity of queue (maximum number of transactions a queue can hold) with priority 0
+ * <b>queue_capacity_1=7</b> Total capacity of queue (maximum number of transactions a queue can hold) with priority 1
+ * <b>queue_capacity_2=5</b> Total capacity of queue (maximum number of transactions a queue can hold) with priority 2
+ * <b>qsp_limit_0=5</b> Maximum number of transactions that can be processed from a queue before jumping to lower priority queues
+ * <b>qsp_limit_1=3</b> Maximum number of transactions that can be processed from a queue before jumping to lower priority queues
+ * <b>qsp_limit_2=2</b> Maximum number of transactions that can be processed from a queue before jumping to lower priority queues
+ * </pre>
+ *
+ * @param configFilePath the Config File Name
+ *
+ * @return the boolean
+ */
+ boolean loadNpmConfig(String configFilePath);
+
+}
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionServiceImpl.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionServiceImpl.java new file mode 100644 index 000000000..5483aacee --- /dev/null +++ b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionServiceImpl.java @@ -0,0 +1,168 @@ +/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.api;
+
+import java.util.Map;
+import java.util.NavigableSet;
+import org.onap.ccsdk.features.lib.npm.NpmException;
+import org.onap.ccsdk.features.lib.npm.models.NpmAck;
+import org.onap.ccsdk.features.lib.npm.models.NpmStatusEnum;
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;
+import org.onap.ccsdk.features.lib.npm.utils.NpmUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * The interface Npm Transaction Service provider.
+ *
+ * @author Kapil Singal
+ */
+public class NpmTransactionServiceImpl implements NpmTransactionService {
+ private static final Logger logger = LoggerFactory.getLogger(NpmTransactionServiceImpl.class);
+ private final NpmServiceManager npmServiceManager;
+
+ public NpmTransactionServiceImpl(NpmServiceManager npmServiceManager) {
+ this.npmServiceManager = npmServiceManager;
+ }
+
+ @Override
+ public List<NpmAck> addTransactionsToQueue(List<NpmTransaction> npmTransactionList) {
+ logger.debug("------------- Inside NPM TS addTransactionsToQueue (List<NpmAck>) -------------");
+ logger.trace("Received addTransactionsToQueue(List) for Npm Transactions:({})", npmTransactionList);
+
+ List<NpmAck> npmAckList = new ArrayList<>();
+ for (NpmTransaction npmTransaction : npmTransactionList) {
+ logger.trace("Npm Transaction with npmTransactionId:({}) is being queued.", npmTransaction.getNpmTransactionId());
+ npmAckList.add(addTransactionsToQueue(npmTransaction));
+ }
+ logger.trace("Responding with npmAckList:({})", npmAckList);
+ return npmAckList;
+ }
+
+ @Override
+ public NpmAck addTransactionsToQueue(NpmTransaction npmTransaction) {
+ logger.debug("------------- Inside NPM TS addTransactionsToQueue -------------");
+ logger.trace("Received addTransactionsToQueue for Npm Transaction:({})", npmTransaction);
+
+ NpmAck npmAck = new NpmAck();
+ npmAck.setNpmTransactionId(npmTransaction.getNpmTransactionId());
+ npmAck.setRequestId(npmTransaction.getRequestId());
+ try {
+ //Validate Npm Transaction before creating entry to NPM_TRANSACTION Table.
+ npmTransaction.validate();
+ if (Arrays.stream(NpmUtils.getPriorityList(npmServiceManager.getNpmConfig("Priority_List"))).noneMatch(i -> i == npmTransaction.getPriority())) {
+ // Setting up the configured default priority value it it's missing in request
+ npmTransaction.setPriority(NumberUtils.toInt(npmServiceManager.getNpmConfig("Default_Priority"), 2));
+ logger.trace("Default priority value:({}) has been set, as it's missing in request.", npmTransaction.getPriority());
+ }
+ logger.trace("Trying to queue Npm Transaction");
+ npmServiceManager.addTransactionToQueue(npmTransaction);
+ npmAck.setStatus(NpmStatusEnum.QUEUED);
+ npmAck.setMessage("Added to Priority Queue.");
+ } catch (NpmException e) {
+ logger.error("Failed to queue Npm Transaction.\nErrorMessage: {}", e.getMessage(), e);
+ npmAck.setStatus(NpmStatusEnum.FAILED);
+ npmAck.setMessage(e.getMessage());
+ }
+ return npmAck;
+ }
+
+ @Override
+ public List<NpmAck> removeTransactionsFromQueue(List<NpmTransaction> npmTransactionList) {
+ logger.debug("------------- Inside NPM TS removeTransactionsFromQueue (List<NpmAck>) -------------");
+ logger.trace("Received removeTransactionsFromQueue for Npm Transactions:({})", npmTransactionList);
+
+ List<NpmAck> npmAckList = new ArrayList<>();
+ for (NpmTransaction npmTransaction : npmTransactionList) {
+ npmAckList.add(removeTransactionsFromQueue(npmTransaction));
+ }
+ logger.trace("Responding with npmAckList:({})", npmAckList);
+ return npmAckList;
+ }
+
+ @Override
+ public NpmAck removeTransactionsFromQueue(NpmTransaction npmTransaction) {
+ logger.debug("------------- Inside NPM TS removeTransactionsFromQueue -------------");
+ logger.trace("Received removeTransactionsFromQueue for Npm Transactions:({})", npmTransaction);
+
+ npmServiceManager.removeTransactionFromQueue(npmTransaction, true);
+
+ NpmAck npmAck = new NpmAck();
+ npmAck.setNpmTransactionId(npmTransaction.getNpmTransactionId());
+ npmAck.setRequestId(npmTransaction.getRequestId());
+ npmAck.setStatus(NpmStatusEnum.PROCESSED);
+ npmAck.setMessage("Removed from Priority Queue");
+ logger.trace("Responding with npmAck:({})", npmAck);
+ return npmAck;
+ }
+
+ @Override
+ public List<NpmTransaction> retrieveTransactionFromQueue(String sbEndpoint, String sbType) {
+ logger.debug("------------- Inside NPM TS retrieveQueueStatus (List<NpmTransaction>) -------------");
+ logger.trace("Received retrieveTransactionFromQueue for sbEndpoint:({}) and sbType:({})", sbEndpoint, sbType);
+ return npmServiceManager.retrieveTransactionFromQueue(sbEndpoint, sbType);
+ }
+
+ @Override
+ public Map<String, Map<Integer, NavigableSet<NpmTransaction>>> retrieveNpmPriorityQueues() {
+ logger.debug("------------- Inside NPM TS retrieveAllPriorityQueues (Map<sbEndpoint, Map<priority, NavigableSet<NpmTransaction>>>) -------------");
+ return npmServiceManager.retrieveNpmPriorityQueues();
+ }
+
+ @Override
+ public void registerService(String serviceKey, NpmServiceCallbackApi npmServiceCallbackApi) {
+ logger.trace("Registering NpmServiceCallbackApi with serviceKey:({})", serviceKey);
+ npmServiceManager.registerService(serviceKey, npmServiceCallbackApi);
+ }
+
+ @Override
+ public boolean loadNpmConfig(String configFilePath) {
+ try {
+ npmServiceManager.loadNpmConfig(configFilePath);
+ return true;
+ } catch (NpmException e) {
+ logger.trace("Loading configurations from file:({}), failed with:({}): ", configFilePath, e.getMessage(), e);
+ }
+ return false;
+ }
+
+ @Override
+ public NpmTransaction formNpmTransaction(String sbEndpoint, String sbType, String serviceKey, Object serviceRequest) {
+ return NpmUtils.formNpmTransaction(null, sbEndpoint, sbType, -1, -1,
+ null, null, serviceKey, serviceRequest);
+ }
+
+ @Override
+ public NpmTransaction formNpmTransaction(UUID npmTransactionId, String sbEndpoint, String sbType, int priority, int connectionCount,
+ Instant timestamp, Instant timeToLive, String serviceKey, Object serviceRequest) {
+ return NpmUtils.formNpmTransaction(npmTransactionId, sbEndpoint, sbType, priority, connectionCount,
+ timestamp, timeToLive, serviceKey, serviceRequest);
+ }
+
+}
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmAck.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmAck.java new file mode 100644 index 000000000..e635df85d --- /dev/null +++ b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmAck.java @@ -0,0 +1,76 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================= + * + */ + +package org.onap.ccsdk.features.lib.npm.models; + +import org.onap.ccsdk.features.lib.npm.utils.NpmUtils; + +import java.util.UUID; + +/** + * The type Npm Ack. + * + * @author Kapil Singal + */ +public class NpmAck { + private UUID npmTransactionId; + private String requestId; // multiple transactions can have the same requestId + private NpmStatusEnum status; + private String message; + + public UUID getNpmTransactionId() { + return npmTransactionId; + } + + public void setNpmTransactionId(UUID npmTransactionId) { + this.npmTransactionId = npmTransactionId; + } + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public NpmStatusEnum getStatus() { + return status; + } + + public void setStatus(NpmStatusEnum status) { + this.status = status; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Override + public String toString() { + return NpmUtils.getJson(this); + } + +} diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmStatusEnum.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmStatusEnum.java new file mode 100644 index 000000000..1fb4bfdda --- /dev/null +++ b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmStatusEnum.java @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================= + * + */ + +package org.onap.ccsdk.features.lib.npm.models; + +/** + * The enum Transaction status enum. + * + * @author Kapil Singal + */ +public enum NpmStatusEnum { + /** + * RECEIVED When Npm receives Npm Transaction + */ + RECEIVED, + /** + * QUEUED When Npm adds Npm Transaction to a priority queue + */ + QUEUED, + /** + * PROCESSING When Npm pulls and notifies Npm Transaction to service. + */ + PROCESSING, + /** + * PROCESSED When Service notifies back Npm about processing done. + */ + PROCESSED, + /** + * FAILED When Npm fails to either queue or notify Npm Transaction. + */ + FAILED, + /** + * EXPIRED When timeToLive passes current UTC time. + */ + EXPIRED, + /** + * OUT_OF_CAPACITY When priority queue is full to it's maximum capacity. + */ + OUT_OF_CAPACITY, +} diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmTransaction.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmTransaction.java new file mode 100644 index 000000000..297d63bc1 --- /dev/null +++ b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmTransaction.java @@ -0,0 +1,204 @@ +/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.models;
+
+import org.onap.ccsdk.features.lib.npm.NpmException;
+import org.onap.ccsdk.features.lib.npm.utils.NpmUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * The type Npm Transaction.
+ *
+ * @author Kapil Singal
+ */
+public class NpmTransaction {
+
+ private UUID npmTransactionId;
+ private String requestId; // multiple transactions can have the same requestId
+
+ private String sbEndpoint;
+ private String sbType;
+
+ private int priority = -1;
+ private int connectionCount = 1;
+
+ private Instant timestamp = Instant.now();
+ private Instant timeToLive = Instant.MAX;
+
+ private NpmStatusEnum status = NpmStatusEnum.RECEIVED;
+ private String message;
+
+ private String serviceKey;
+ private Object serviceRequest;
+
+ public UUID getNpmTransactionId() {
+ return npmTransactionId;
+ }
+
+ public void setNpmTransactionId(UUID npmTransactionId) {
+ this.npmTransactionId = npmTransactionId;
+ }
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ public String getSbEndpoint() {
+ return sbEndpoint;
+ }
+
+ public void setSbEndpoint(String sbEndpoint) {
+ this.sbEndpoint = sbEndpoint;
+ }
+
+ public String getSbType() {
+ return sbType;
+ }
+
+ public void setSbType(String sbType) {
+ this.sbType = sbType;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ public int getConnectionCount() {
+ return connectionCount;
+ }
+
+ public void setConnectionCount(int connectionCount) {
+ this.connectionCount = connectionCount;
+ }
+
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(Instant timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public Instant getTimeToLive() {
+ return timeToLive;
+ }
+
+ public void setTimeToLive(Instant timeToLive) {
+ this.timeToLive = timeToLive;
+ }
+
+ public NpmStatusEnum getStatus() {
+ return status;
+ }
+
+ public void setStatus(NpmStatusEnum status) {
+ this.status = status;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public String getServiceKey() {
+ return serviceKey;
+ }
+
+ public void setServiceKey(String serviceKey) {
+ this.serviceKey = serviceKey;
+ }
+
+ public Object getServiceRequest() {
+ return serviceRequest;
+ }
+
+ public void setServiceRequest(Object serviceRequest) {
+ this.serviceRequest = serviceRequest;
+ }
+
+ /**
+ * Validate boolean.
+ *
+ * @throws NpmException the validator exception
+ */
+ public void validate() throws NpmException {
+ if (npmTransactionId == null) {
+ throw new NpmException("Transaction is not valid: npmTransactionId is required.");
+ }
+ if (StringUtils.isBlank(sbEndpoint)) {
+ throw new NpmException("Transaction is not valid: sbEndpoint is required.");
+ }
+ if (StringUtils.isBlank(sbType)) {
+ throw new NpmException("Transaction is not valid: sbType is required.");
+ }
+ if (timestamp == null) {
+ throw new NpmException("Transaction is not valid: txTimestamp is required.");
+ }
+ if (timeToLive == null) {
+ throw new NpmException("Transaction is not valid: timeToLive is required.");
+ }
+ if (StringUtils.isBlank(serviceKey)) {
+ throw new NpmException("Transaction is not valid: serviceKey is required.");
+ }
+ if (serviceRequest == null) {
+ throw new NpmException("Transaction is not valid: serviceRequest is required.");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return NpmUtils.getJson(this);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof NpmTransaction)) {
+ return false;
+ }
+ NpmTransaction that = (NpmTransaction) o;
+ return npmTransactionId.equals(that.npmTransactionId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Math.abs(Objects.hash(npmTransactionId));
+ }
+
+}
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/utils/NpmUtils.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/utils/NpmUtils.java new file mode 100644 index 000000000..735d6d91f --- /dev/null +++ b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/utils/NpmUtils.java @@ -0,0 +1,178 @@ +/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+/**
+ * The type Npm utils.
+ *
+ * @author Kapil Singal
+ */
+public class NpmUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(NpmUtils.class);
+
+ private NpmUtils() {
+ }
+
+ /**
+ * This is a getJson method
+ *
+ * @param instance the instance
+ *
+ * @return String json
+ */
+ public static String getJson(Object instance) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.enable(SerializationFeature.INDENT_OUTPUT);
+ return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(instance);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ /**
+ * Gets list from json string.
+ *
+ * @param <T> the type parameter
+ * @param content the content
+ * @param valueType the value type
+ *
+ * @return the list of type parameter from json string
+ */
+ public static <T> List<T> getListFromJsonString(String content, Class<T> valueType) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ if (mapper.readTree(content) instanceof ArrayNode) {
+ return mapper.readValue(content, mapper.getTypeFactory().constructCollectionType(List.class, valueType));
+ } else {
+ return Collections.singletonList(mapper.readValue(content, valueType));
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ return Collections.emptyList();
+ }
+
+ /**
+ * Form npm transaction npm transaction.
+ *
+ * @param npmTransactionId the npmTransactionId (instance of UUID :: defaults to Random UUID if null)
+ * @param sbEndpoint the sbEndpoint (Host IP Address :: used to create priority queues)
+ * @param sbType the sbType (eg. EMS_ERICSSON, EMS_NOKIA, D2_MSN etc :: used to determine parallel connections it can support )
+ * @param priority the priority (n :: lowest index is the highest priority, defaults to (least priority) if -1
+ * @param connectionCount the connectionCount (n :: total number of connection a transaction would occupy while processing defaults to (1) if -1
+ * @param timestamp the timestamp (instance of Instant :: defaults to Current Time if null)
+ * @param timeToLive the timeToLive (instance of Instant :: defaults to MAX Time if null)
+ * @param serviceKey the serviceKey (the unique serviceKey specific to service reference)
+ * @param serviceRequest the serviceRequest (actual request payload received from upstream)
+ *
+ * @return the npm transaction
+ */
+ public static NpmTransaction formNpmTransaction(UUID npmTransactionId, String sbEndpoint, String sbType, int priority, int connectionCount,
+ Instant timestamp, Instant timeToLive, String serviceKey, Object serviceRequest) {
+
+ NpmTransaction npmTransaction = new NpmTransaction();
+ npmTransaction.setNpmTransactionId(npmTransactionId == null ? UUID.randomUUID() : npmTransactionId);
+ npmTransaction.setSbEndpoint(sbEndpoint);
+ npmTransaction.setSbType(sbType);
+ if (priority > -1) {
+ npmTransaction.setPriority(priority);
+ }
+ if (connectionCount > 0) {
+ npmTransaction.setConnectionCount(connectionCount);
+ }
+ npmTransaction.setTimestamp(timestamp == null ? Instant.now() : timestamp);
+ npmTransaction.setTimeToLive(timeToLive == null ? Instant.MAX : timeToLive);
+ npmTransaction.setServiceKey(serviceKey);
+ npmTransaction.setServiceRequest(serviceRequest);
+
+ return npmTransaction;
+ }
+
+ /**
+ * This is a isAllOkToProcess method
+ *
+ * @param qspLimit the queue serve period limit
+ * @param qspCounter the queue serve period counter
+ * @param connectionCounter the connection counter for sb_endpoint
+ * @param sbConnectionLimit the sb_endpoint parallel connection limit
+ *
+ * @return true if: queue serve period is not met and sbEndpoint is still having connection slot empty
+ * <p>
+ * false if: queue serve period is reached to max limit or particular SB (sbEndpoint) is already occupied to max connection limit
+ */
+ public static boolean isAllOkToProcess(int qspLimit, int qspCounter, int connectionCounter, int sbConnectionLimit) {
+ return qspLimit > qspCounter && connectionCounter < sbConnectionLimit;
+ }
+
+ /**
+ * Is expired boolean.
+ *
+ * @param npmTransaction the NpmTransaction instance
+ *
+ * @return true if timeToLive is passed than current UTC Time else false
+ */
+ public static boolean isExpired(NpmTransaction npmTransaction) {
+ return npmTransaction != null && npmTransaction.getTimeToLive().compareTo(Instant.now()) <= 0;
+ }
+
+ /**
+ * Get priority list.
+ *
+ * @param priorities defined Property_List from properties file
+ *
+ * @return the int [priorities]
+ */
+ public static int[] getPriorityList(String priorities) {
+ return Stream.of(StringUtils.defaultIfBlank(priorities, "0,1,2").split(",")).mapToInt(Integer::parseInt).toArray();
+ }
+
+ /**
+ * Gets npm priority queue key.
+ *
+ * @param sbEndpoint the sb endpoint
+ * @param sbType the sb type
+ *
+ * @return the npm priority queue key
+ */
+ public static String getNpmPriorityQueueKey(String sbEndpoint, String sbType) {
+ return sbEndpoint.concat("##").concat(sbType);
+ }
+
+}
diff --git a/lib/network-prioritization/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/lib/network-prioritization/src/main/resources/OSGI-INF/blueprint/blueprint.xml new file mode 100644 index 000000000..6ad1bc7e1 --- /dev/null +++ b/lib/network-prioritization/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ ONAP : ccsdk features + ~ ================================================================================ + ~ Update Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END======================================================= + ~ + --> + +<blueprint xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0" + xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" + odl:use-default-for-reference-types="true"> + + <bean id="npmServiceManager" class="org.onap.ccsdk.features.lib.npm.api.NpmServiceManagerImpl"/> + + <bean id="npmTransactionService" class="org.onap.ccsdk.features.lib.npm.api.NpmTransactionServiceImpl"> + <argument ref="npmServiceManager"/> + </bean> + + <service ref="npmTransactionService" interface="org.onap.ccsdk.features.lib.npm.api.NpmTransactionService"/> + +</blueprint> diff --git a/lib/network-prioritization/src/main/resources/properties/npm-config.properties b/lib/network-prioritization/src/main/resources/properties/npm-config.properties new file mode 100644 index 000000000..9df556aa5 --- /dev/null +++ b/lib/network-prioritization/src/main/resources/properties/npm-config.properties @@ -0,0 +1,10 @@ +Priority_List=0,1,2 +Default_Priority=2 +EMS_ERICSSON=2 +EMS_NOKIA=2 +queue_capacity_0=10 +queue_capacity_1=7 +queue_capacity_2=5 +qsp_limit_0=5 +qsp_limit_1=3 +qsp_limit_2=2 |