From 7ab93201e557976ed8b383cb5652fa129d7b36f7 Mon Sep 17 00:00:00 2001 From: Zlatko Murgoski Date: Mon, 31 Dec 2018 11:55:42 +0100 Subject: Fix sonar violation Fix sonar violation' Change-Id: Ia5718d2bcbf9f5efea40d8250b7ad57f6d2eb2f3 Issue-ID: DCAEGEN2-1016 Signed-off-by: Zlatko Murgoski --- .../java/org/onap/dcae/commonFunction/AnyNode.java | 113 ---- .../commonFunction/ConfigProcessorAdapter.java | 45 -- .../onap/dcae/commonFunction/ConfigProcessors.java | 598 --------------------- .../java/org/onap/dcae/commonFunction/Event.java | 34 -- .../onap/dcae/commonFunction/EventProcessor.java | 61 --- .../org/onap/dcae/commonFunction/EventSender.java | 125 ----- .../org/onap/dcae/commonFunction/Processor.java | 33 -- .../dcae/commonFunction/SSLContextCreator.java | 82 --- .../org/onap/dcae/commonFunction/VESLogger.java | 160 ------ .../event/publishing/DMaaPConfigurationParser.java | 112 ---- .../event/publishing/DMaaPEventPublisher.java | 100 ---- .../event/publishing/DMaaPPublishersBuilder.java | 61 --- .../event/publishing/DMaaPPublishersCache.java | 121 ----- .../event/publishing/EventPublisher.java | 38 -- .../event/publishing/PublisherConfig.java | 99 ---- .../commonFunction/event/publishing/VavrUtils.java | 65 --- 16 files changed, 1847 deletions(-) delete mode 100644 src/main/java/org/onap/dcae/commonFunction/AnyNode.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/ConfigProcessorAdapter.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/Event.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/EventProcessor.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/EventSender.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/Processor.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/SSLContextCreator.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/VESLogger.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java (limited to 'src/main/java/org/onap/dcae/commonFunction') diff --git a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java deleted file mode 100644 index 7be45b0c..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java +++ /dev/null @@ -1,113 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2018 Nokia Networks Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction; - -import io.vavr.collection.List; -import io.vavr.collection.Set; -import io.vavr.control.Option; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; - -import java.util.stream.StreamSupport; - -import static io.vavr.API.Set; - -/** - * This class is a wrapper for 2 most used entities of org.json lib: JSONArray and JSONObject and comprises utility - * methods for fast access of json structures without need to explicitly coerce between them. While using this, bear in - * mind it does not contain exception handling - it is assumed that when using, the parsed json structure is known. - * - * @author koblosz - */ -public class AnyNode { - - private Object obj; - - private AnyNode(Object object) { - this.obj = object; - } - - public static AnyNode fromString(String content) { - return new AnyNode(new JSONObject(content)); - } - - /** - * Returns key set of underlying object. It is assumed that underlying object is of type org.json.JSONObject. - */ - public Set keys() { - return Set(asJsonObject().keySet().toArray(new String[]{})); - } - - /** - * Returns value associated with specified key wrapped with AnyValue object. It is assumed that this is of type - * org.json.JSONObject. - */ - public AnyNode get(String key) { - return new AnyNode(asJsonObject().get(key)); - } - - /** - * Returns string representation of this. If it happens to have null, the value is treated as - * org.json.JSONObject.NULL and "null" string is returned then. - */ - public String toString() { - return this.obj.toString(); - } - - /** - * Returns optional of object under specified key, wrapped with AnyNode object. - * If underlying object is not of type org.json.JSONObject - * or underlying object has no given key - * or given key is null - * then Optional.empty will be returned. - */ - public Option getAsOption(String key) { - try { - AnyNode value = get(key); - if (value.toString().equals("null")) { - return Option.none(); - } - return Option.some(value); - } catch (JSONException ex) { - return Option.none(); - } - } - - /** - * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that - * underlying object is of type org.json.JSONObject. - */ - public List toList() { - return List.ofAll(StreamSupport.stream(((JSONArray) this.obj).spliterator(), false).map(AnyNode::new)); - } - - /** - * Checks if specified key is present in this. It is assumed that this is of type JSONObject. - */ - public boolean has(String key) { - return !getAsOption(key).isEmpty(); - } - - private JSONObject asJsonObject() { - return (JSONObject) this.obj; - } - -} diff --git a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessorAdapter.java b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessorAdapter.java deleted file mode 100644 index 3df412ea..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessorAdapter.java +++ /dev/null @@ -1,45 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.commonFunction; - -import java.lang.reflect.Method; -import org.json.JSONObject; - -class ConfigProcessorAdapter { - - private final ConfigProcessors configProcessors; - - ConfigProcessorAdapter(ConfigProcessors configProcessors) { - this.configProcessors = configProcessors; - } - - boolean isFilterMet(JSONObject parameter) { - return configProcessors.isFilterMet(parameter); - } - - void runConfigProcessorFunctionByName(String functionName, JSONObject parameter) - throws ReflectiveOperationException { - Method method = configProcessors.getClass() - .getDeclaredMethod(functionName, parameter.getClass()); - method.invoke(configProcessors, parameter); - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java deleted file mode 100644 index 09ceeac7..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java +++ /dev/null @@ -1,598 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.commonFunction; - -import org.json.JSONArray; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.text.DecimalFormat; - -public class ConfigProcessors { - - private static final Logger log = LoggerFactory.getLogger(ConfigProcessors.class); - private static final String FIELD = "field"; - private static final String OLD_FIELD = "oldField"; - private static final String FILTER = "filter"; - private static final String VALUE = "value"; - private static final String REGEX = "\\[\\]"; - private static final String OBJECT_NOT_FOUND = "ObjectNotFound"; - private static final String FILTER_NOT_MET = "Filter not met"; - private static final String MAP_TYPE = "mapType"; - private static final String COMP_FALSE = "==false"; - - private final JSONObject event; - - public ConfigProcessors(JSONObject eventJson) { - event = eventJson; - } - - public void getValue(JSONObject jsonObject) { - - final String field = jsonObject.getString(FIELD); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - - if (filter == null || isFilterMet(filter)) { - getEventObjectVal(field); - } else - log.info(FILTER_NOT_MET); - } - - - public void setValue(JSONObject jsonObject) { - final String field = jsonObject.getString(FIELD); - final String value = jsonObject.getString(VALUE); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - if (filter == null || isFilterMet(filter)) { - setEventObjectVal(field, value); - } else - log.info(FILTER_NOT_MET); - } - - - private String evaluate(String str) { - String value = str; - if (str.startsWith("$")) { - value = (String) getEventObjectVal(str.substring(1)); - - } - return value; - } - - - public void suppressEvent(JSONObject jsonObject) { - final JSONObject filter = jsonObject.optJSONObject(FILTER); - - if (filter == null || isFilterMet(filter)) { - setEventObjectVal("suppressEvent", "true"); - } else - log.info(FILTER_NOT_MET); - } - - - public void addAttribute(JSONObject jsonObject) { - - final String field = jsonObject.getString(FIELD); - final String value = evaluate(jsonObject.getString(VALUE)); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - final String fieldType = jsonObject.optString("fieldType", "string").toLowerCase(); - - if (filter == null || isFilterMet(filter)) { - setEventObjectVal(field, value, fieldType); - } else - log.info(FILTER_NOT_MET); - } - - - public void updateAttribute(JSONObject jsonObject) { - - final String field = jsonObject.getString(FIELD); - final String value = evaluate(jsonObject.getString(VALUE)); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - if (filter == null || isFilterMet(filter)) { - setEventObjectVal(field, value); - } else - log.info(FILTER_NOT_MET); - } - - - public void removeAttribute(JSONObject jsonObject) { - - final String field = jsonObject.getString(FIELD); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - - if (filter == null || isFilterMet(filter)) { - removeEventKey(field); - } else - log.info(FILTER_NOT_MET); - } - - - private void renameArrayInArray(JSONObject jsonObject) // map - { - log.info("renameArrayInArray"); - final String field = jsonObject.getString(FIELD); - final String oldField = jsonObject.getString(OLD_FIELD); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - - if (filter == null || isFilterMet(filter)) { - - final String[] fsplit = field.split(REGEX, field.length()); - final String[] oldfsplit = oldField.split(REGEX, oldField.length()); - - final String oldValue = getEventObjectVal(oldfsplit[0]).toString(); - if (!oldValue.equals(OBJECT_NOT_FOUND)) { - final String oldArrayName = oldfsplit[1].substring(1); - final String newArrayName = fsplit[1].substring(1); - final String value = oldValue.replaceAll(oldArrayName, newArrayName); - - log.info("oldValue ==" + oldValue); - log.info("value ==" + value); - JSONArray ja = new JSONArray(value); - removeEventKey(oldfsplit[0]); - setEventObjectVal(fsplit[0], ja); - } - } else - log.info(FILTER_NOT_MET); - } - - private void renameObject(JSONObject jsonObject) // map - { - log.info("renameArrayInArray"); - final String field = jsonObject.getString(FIELD); - final String oldField = jsonObject.getString(OLD_FIELD); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - - if (filter == null || isFilterMet(filter)) { - - final JSONObject oldValue = (JSONObject) getEventObjectVal(oldField); - if (!oldValue.toString().equals(OBJECT_NOT_FOUND)) { - setEventObjectVal(field, oldValue); - removeEventKey(oldField); - } - } else - log.info(FILTER_NOT_MET); - } - - public void map(JSONObject jsonObject) { - - final String field = jsonObject.getString(FIELD); - final String mapType = jsonObject.optString(MAP_TYPE, ""); - if (field.contains("[]")) { - if (field.matches(".*\\[\\]\\..*\\[\\]")) - renameArrayInArray(jsonObject); - else - mapToJArray(jsonObject); - } - else if (mapType.equals("hashmapToNameValueArray")) - mapHashmapToNameValueArray(jsonObject); - else if (mapType.equals("nameValueArrayToHashmap")) - mapNameValueArrayToHashmap(jsonObject); - else if (mapType.equals("renameObject")) - renameObject(jsonObject); - - else - mapAttribute(jsonObject); - } - - private String performOperation(String operation, String value) { - log.info("performOperation"); - if ("convertMBtoKB".equals(operation)) { - float kbValue = Float.parseFloat(value) * 1024; - value = String.valueOf(kbValue); - } - return value; - } - - - public void mapAttribute(JSONObject jsonObject) { - - final String field = jsonObject.getString(FIELD); - final String oldField = jsonObject.getString(OLD_FIELD); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - final String operation = jsonObject.optString("operation"); - String value; - if (filter == null || isFilterMet(filter)) { - - value = getEventObjectVal(oldField).toString(); - if (!value.equals(OBJECT_NOT_FOUND)) { - if (operation != null && !operation.isEmpty()) - value = performOperation(operation, value); - - setEventObjectVal(field, value); - - removeEventKey(oldField); - } - } else - log.info(FILTER_NOT_MET); - } - - - private void mapToJArray(JSONObject jsonObject) { - log.info("mapToJArray"); - String field = jsonObject.getString(FIELD); - String oldField = jsonObject.getString(OLD_FIELD); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - final JSONObject attrMap = jsonObject.optJSONObject("attrMap"); - oldField = oldField.replaceAll(REGEX, ""); - field = field.replaceAll(REGEX, ""); - - if (filter == null || isFilterMet(filter)) { - - String value = getEventObjectVal(oldField).toString(); - if (!value.equals(OBJECT_NOT_FOUND)) { - log.info("old value ==" + value); - // update old value based on attrMap - if (attrMap != null) { - // loop thru attrMap and update attribute name to new name - for (String key : attrMap.keySet()) { - value = value.replaceAll(key, attrMap.getString(key)); - } - } - - log.info("new value ==" + value); - char c = value.charAt(0); - if (c != '[') { - // oldfield is JsonObject - JSONObject valueJO = new JSONObject(value); - // if the array already exists - String existingValue = getEventObjectVal(field).toString(); - if (!existingValue.equals(OBJECT_NOT_FOUND)) { - JSONArray ja = new JSONArray(existingValue); - JSONObject jo = ja.optJSONObject(0); - if (jo != null) { - for (String key : valueJO.keySet()) { - jo.put(key, valueJO.get(key)); - - } - ja.put(0, jo); - - setEventObjectVal(field, ja); - } - } else // if new array - setEventObjectVal(field + "[0]", new JSONObject(value), "JArray"); - } else // oldfield is jsonArray - setEventObjectVal(field, new JSONArray(value)); - - removeEventKey(oldField); - } - } else - log.info(FILTER_NOT_MET); - } - - // this method is to support the mapping 5.x to VES7.x format for additionalInformation field - private void mapNameValueArrayToHashmap(JSONObject jsonObject) { - log.info("mapNameValueArrayToHashmap"); - String field = jsonObject.getString(FIELD); - String oldField = jsonObject.getString(FIELD); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - - if (filter == null || isFilterMet(filter)) { - JSONObject newHashMap = new JSONObject(); // this will hold the newly mapped hashmap elements - JSONArray arrayValue = (JSONArray) getEventObjectVal(oldField); // old Array structure value - JSONObject tempJObj = null; - String tempName = ""; - String tempValue = ""; - if (!arrayValue.toString().equals(OBJECT_NOT_FOUND)) { - log.info("old value ==" + arrayValue.toString()); - // Loop thru the JSONArray, get the name:value pair and write to new JSONObject as hashmap elements - for (int i = 0; i < arrayValue.length(); i++) { - - tempJObj = arrayValue.getJSONObject(i); - if (tempJObj != null) { - tempName = tempJObj.get("name").toString(); - tempValue = tempJObj.get(VALUE).toString(); - newHashMap.put(tempName, tempValue); - } - } - // remove the old Array structure - removeEventKey(oldField); - //Add the new Hashmap - setEventObjectVal(field, newHashMap); - } - } else - log.info(FILTER_NOT_MET); - } - - // this method is to support the mapping 7.x to VES5.x format for additionalInformation field - private void mapHashmapToNameValueArray(JSONObject jsonObject) { - log.info("mapHashmapToNameValueArray"); - System.out.println("mapHashmapToNameValueArray"); - String field = jsonObject.getString(FIELD); - String oldField = jsonObject.getString(FIELD); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - - if (filter == null || isFilterMet(filter)) { - JSONArray newArray = new JSONArray(); // this will hold the new name:value JSONObject - JSONObject nameValJObj; - System.out.println("object ==" + getEventObjectVal(oldField).toString()); - if (!getEventObjectVal(oldField).toString().equals(OBJECT_NOT_FOUND)) { - - JSONObject hashMap = (JSONObject) getEventObjectVal(oldField); // old hashmap structure value - if (hashMap != null) { - log.info("old value ==" + hashMap.toString()); - // Loop thru the hashMap JSONObject, get the hashmap elements add them as name:value JsonObject into the newArray - for (String key : hashMap.keySet()) { - nameValJObj = new JSONObject(); //create new object so not to overwrite in memory for Array insertion - nameValJObj.put("name", key); - nameValJObj.put("value", hashMap.get(key)); - newArray.put(nameValJObj); - } - // remove the old hashMap structure - removeEventKey(oldField); - //Add the newArray containing the name:value Object - setEventObjectVal(field, newArray); - } - } - } else - log.info(FILTER_NOT_MET); - } - - /** - * example - { "functionName": "concatenateValue", "args":{ "filter": - * {"event.commonEventHeader.event":"heartbeat"}, - * FIELD:"event.commonEventHeader.eventName", "concatenate": - * ["event.commonEventHeader.domain","event.commonEventHeader.eventType","event.commonEventHeader.alarmCondition"], - * "delimiter":"_" } } - **/ - public void concatenateValue(JSONObject jsonObject) { - - final String field = jsonObject.getString(FIELD); - final String delimiter = jsonObject.getString("delimiter"); - final JSONArray values = jsonObject.getJSONArray("concatenate"); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - if (filter == null || isFilterMet(filter)) { - StringBuilder value = new StringBuilder(); - for (int i = 0; i < values.length(); i++) { - - String tempVal = evaluate(values.getString(i)); - if (!tempVal.equals(OBJECT_NOT_FOUND)) { - if (i == 0) - value.append(tempVal); - else - value.append(delimiter).append(tempVal); - } - } - - setEventObjectVal(field, value.toString()); - } else - log.info(FILTER_NOT_MET); - } - - public void subtractValue(JSONObject jsonObject) { - - final String field = jsonObject.getString(FIELD); - final JSONArray values = jsonObject.getJSONArray("subtract"); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - if (filter == null || isFilterMet(filter)) { - float value = 0; - for (int i = 0; i < values.length(); i++) { - log.info(values.getString(i)); - String tempVal = evaluate(values.getString(i)); - log.info("tempVal==" + tempVal); - if (!tempVal.equals(OBJECT_NOT_FOUND)) { - if (i == 0) - value = value + Float.valueOf(tempVal); - else - value = value - Float.valueOf(tempVal); - } - } - log.info("value ==" + value); - setEventObjectVal(field, value, "number"); - } else - log.info(FILTER_NOT_MET); - } - - - private void removeEventKey(String field) { - String[] keySet = field.split("\\.", field.length()); - JSONObject keySeries = event; - for (int i = 0; i < (keySet.length - 1); i++) { - - keySeries = keySeries.getJSONObject(keySet[i]); - } - - keySeries.remove(keySet[keySet.length - 1]); - } - - - private boolean checkFilter(JSONObject jo, String key, String logicKey) { - String filterValue = jo.getString(key); - if (filterValue.contains(":")) { - String[] splitVal = filterValue.split(":"); - if ("matches".equals(splitVal[0])) { - if ("not".equals(logicKey)) { - if (getEventObjectVal(key).toString().matches(splitVal[1])) { - log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); - return false; - } - } else { - if (!(getEventObjectVal(key).toString().matches(splitVal[1]))) { - log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); - return false; - } - } - - } - if ("contains".equals(splitVal[0])) { - if ("not".equals(logicKey)) { - if (getEventObjectVal(key).toString().contains(splitVal[1])) { - log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); - return false; - } - } else { - if (!(getEventObjectVal(key).toString().contains(splitVal[1]))) { - log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); - return false; - } - } - - } - } else { - if ("not".equals(logicKey)) { - if (getEventObjectVal(key).toString().equals(filterValue)) { - log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); - return false; - } - } else { - if (!(getEventObjectVal(key).toString().equals(filterValue))) { - log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); - return false; - } - } - } - return true; - } - - - public boolean isFilterMet(JSONObject jo) { - for (String key : jo.keySet()) { - if ("not".equals(key)) { - JSONObject njo = jo.getJSONObject(key); - for (String njoKey : njo.keySet()) { - if (!checkFilter(njo, njoKey, key)) - return false; - } - } else { - if (!checkFilter(jo, key, key)) - return false; - } - } - return true; - } - - /** - * returns a string or JSONObject or JSONArray - **/ - public Object getEventObjectVal(String keySeriesStr) { - keySeriesStr = keySeriesStr.replaceAll("\\[", "."); - keySeriesStr = keySeriesStr.replaceAll("\\]", "."); - if (keySeriesStr.contains("..")) { - keySeriesStr = keySeriesStr.replaceAll("\\.\\.", "."); - } - - if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() - 1) - keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1); - String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length()); - Object keySeriesObj = event; - for (String aKeySet : keySet) { - if (keySeriesObj != null) { - if (keySeriesObj instanceof String) { - - log.info("STRING==" + keySeriesObj); - } else if (keySeriesObj instanceof JSONArray) { - keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(aKeySet)); - - } else if (keySeriesObj instanceof JSONObject) { - keySeriesObj = ((JSONObject) keySeriesObj).opt(aKeySet); - - } else { - log.info("unknown object==" + keySeriesObj); - } - } - } - - if (keySeriesObj == null) - return OBJECT_NOT_FOUND; - return keySeriesObj; - } - - public void setEventObjectVal(String keySeriesStr, Object value) { - setEventObjectVal(keySeriesStr, value, "string"); - } - - /** - * returns a string or JSONObject or JSONArray - **/ - public void setEventObjectVal(String keySeriesStr, Object value, String fieldType) { - keySeriesStr = keySeriesStr.replaceAll("\\[", "."); - keySeriesStr = keySeriesStr.replaceAll("\\]", "."); - if (keySeriesStr.contains("..")) { - keySeriesStr = keySeriesStr.replaceAll("\\.\\.", "."); - } - log.info("fieldType==" + fieldType); - - if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() - 1) - keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1); - String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length()); - Object keySeriesObj = event; - for (int i = 0; i < (keySet.length - 1); i++) { - - if (keySeriesObj instanceof JSONArray) { - - if (((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])) == null) // if - // the - // object - // is - // not - // there - // then - // add - // it - { - log.info("Object is null, must add it"); - if (keySet[i + 1].matches("[0-9]*")) // if index then array - ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONArray()); - else - ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONObject()); - } - keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])); - - } else if (keySeriesObj instanceof JSONObject) { - if (((JSONObject) keySeriesObj).opt(keySet[i]) == null) // if - // the - // object - // is - // not - // there - // then - // add - // it - { - if (keySet[i + 1].matches("[0-9]*")) // if index then array - ((JSONObject) keySeriesObj).put(keySet[i], new JSONArray()); - else - ((JSONObject) keySeriesObj).put(keySet[i], new JSONObject()); - log.info("Object is null, must add it"); - } - keySeriesObj = ((JSONObject) keySeriesObj).opt(keySet[i]); - } else { - log.info("unknown object==" + keySeriesObj); - } - } - if ("number".equals(fieldType)) { - DecimalFormat df = new DecimalFormat("#.0"); - if (value instanceof String) - ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], - Float.valueOf(df.format(Float.valueOf((String) value)))); - else - ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], Float.valueOf(df.format(value))); - } else if ("integer".equals(fieldType) && value instanceof String) - ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], Integer.valueOf((String) value)); - else if ("JArray".equals(fieldType)) - ((JSONArray) keySeriesObj).put(value); - else - ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], value); - - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/Event.java b/src/main/java/org/onap/dcae/commonFunction/Event.java deleted file mode 100644 index faae2451..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/Event.java +++ /dev/null @@ -1,34 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction; - -import com.google.gson.JsonObject; - -import java.util.List; - -class Event { - final JsonObject filter; - final List processors; - - Event(JsonObject filter, List processors) { - this.filter = filter; - this.processors = processors; - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java deleted file mode 100644 index bd830456..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java +++ /dev/null @@ -1,61 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.commonFunction; - -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; -import org.json.JSONObject; -import org.onap.dcae.VesApplication; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EventProcessor implements Runnable { - - private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); - private EventSender eventSender; - - public EventProcessor(EventSender eventSender) { - this.eventSender = eventSender; - } - - @Override - public void run() { - try { - while (true){ - JSONObject event = VesApplication.fProcessingInputQueue.take(); - log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event); - setLoggingContext(event); - log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + eventSender.getDomain(event)); - eventSender.send(event); - log.debug("Message published" + event); - } - } catch (InterruptedException e) { - log.error("EventProcessor InterruptedException" + e.getMessage()); - Thread.currentThread().interrupt(); - } - } - - private void setLoggingContext(JSONObject event) { - LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get("VESuniqueId").toString()); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - } -} \ No newline at end of file diff --git a/src/main/java/org/onap/dcae/commonFunction/EventSender.java b/src/main/java/org/onap/dcae/commonFunction/EventSender.java deleted file mode 100644 index 8a9c1eca..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/EventSender.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved.s - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction; - -import com.google.common.reflect.TypeToken; -import com.google.gson.Gson; -import io.vavr.collection.Map; -import io.vavr.control.Option; -import java.io.FileReader; -import java.io.IOException; -import java.lang.reflect.Type; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Date; -import java.util.List; -import java.util.Optional; -import org.json.JSONObject; -import org.onap.dcae.ApplicationSettings; -import org.onap.dcae.commonFunction.event.publishing.EventPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EventSender { - - private Map streamidHash; - private ApplicationSettings properties; - private EventPublisher eventPublisher; - - static final Type EVENT_LIST_TYPE = new TypeToken>() {}.getType(); - private static final Logger log = LoggerFactory.getLogger(EventSender.class); - private static final String EVENT_LITERAL = "event"; - private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); - - public EventSender( EventPublisher eventPublisher, ApplicationSettings properties) { - this.eventPublisher = eventPublisher; - this.streamidHash = properties.dMaaPStreamsMapping(); - this.properties = properties; - - } - - public void send(JSONObject event) { - streamidHash.get(getDomain(event)) - .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + event)) - .forEach(streamIds -> sendEventsToStreams(event, streamIds)); - } - - public static String getDomain(JSONObject event) { - return event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain"); - } - - private void sendEventsToStreams(JSONObject event, String[] streamIdList) { - for (String aStreamIdList : streamIdList) { - log.info("Invoking publisher for streamId:" + aStreamIdList); - eventPublisher.sendEvent(overrideEvent(event), aStreamIdList); - } - } - - private JSONObject overrideEvent(JSONObject event) { - JSONObject jsonObject = addCurrentTimeToEvent(event); - if (properties.eventTransformingEnabled()) { - try (FileReader fr = new FileReader("./etc/eventTransform.json")) { - log.info("parse eventTransform.json"); - List events = new Gson().fromJson(fr, EVENT_LIST_TYPE); - parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject))); - } catch (IOException e) { - log.error("Couldn't find file ./etc/eventTransform.json" + e.toString()); - } - } - if (jsonObject.has("VESversion")) - jsonObject.remove("VESversion"); - - log.debug("Modified event:" + jsonObject); - return jsonObject; - } - - private JSONObject addCurrentTimeToEvent(JSONObject event) { - final Date currentTime = new Date(); - JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime)); - JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); - commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); - event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); - return event; - } - - private void parseEventsJson(List eventsTransform, ConfigProcessorAdapter configProcessorAdapter) { - for (Event eventTransform : eventsTransform) { - JSONObject filterObj = new JSONObject(eventTransform.filter.toString()); - if (configProcessorAdapter.isFilterMet(filterObj)) { - callProcessorsMethod(configProcessorAdapter, eventTransform.processors); - } - } - } - - private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List processors) { - for (Processor processor : processors) { - final String functionName = processor.functionName; - final JSONObject args = new JSONObject(processor.args.toString()); - log.info(String.format("functionName==%s | args==%s", functionName, args)); - try { - configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args); - } catch (ReflectiveOperationException e) { - log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); - } - } - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/Processor.java b/src/main/java/org/onap/dcae/commonFunction/Processor.java deleted file mode 100644 index ea79f1d3..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/Processor.java +++ /dev/null @@ -1,33 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.commonFunction; - -import com.google.gson.JsonObject; - -class Processor { - final String functionName; - final JsonObject args; - - Processor(String functionName, JsonObject args) { - this.functionName = functionName; - this.args = args; - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/SSLContextCreator.java b/src/main/java/org/onap/dcae/commonFunction/SSLContextCreator.java deleted file mode 100644 index 29e974ef..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/SSLContextCreator.java +++ /dev/null @@ -1,82 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.commonFunction; - -import org.springframework.boot.web.server.Ssl; - -import java.nio.file.Path; - -public class SSLContextCreator { - private final String keyStorePassword; - private final String certAlias; - private final Path keyStoreFile; - - private Path trustStoreFile; - private String trustStorePassword; - private boolean hasTlsClientAuthentication = false; - - public static SSLContextCreator create(final Path keyStoreFile, final String certAlias, final String password) { - return new SSLContextCreator(keyStoreFile, certAlias, password); - } - - private SSLContextCreator(final Path keyStoreFile, final String certAlias, final String password) { - this.certAlias = certAlias; - this.keyStoreFile = keyStoreFile; - this.keyStorePassword = password; - } - - public SSLContextCreator withTlsClientAuthentication(final Path trustStoreFile, final String password) { - hasTlsClientAuthentication = true; - this.trustStoreFile = trustStoreFile; - this.trustStorePassword = password; - - return this; - } - - private void configureKeyStore(final Ssl ssl) { - final String keyStore = keyStoreFile.toAbsolutePath().toString(); - - ssl.setKeyStore(keyStore); - ssl.setKeyPassword(keyStorePassword); - ssl.setKeyAlias(certAlias); - } - - private void configureTrustStore(final Ssl ssl) { - final String trustStore = trustStoreFile.toAbsolutePath().toString(); - - ssl.setTrustStore(trustStore); - ssl.setTrustStorePassword(trustStorePassword); - ssl.setClientAuth(Ssl.ClientAuth.NEED); - } - - public Ssl build() { - final Ssl ssl = new Ssl(); - ssl.setEnabled(true); - - configureKeyStore(ssl); - - if (hasTlsClientAuthentication) { - configureTrustStore(ssl); - } - - return ssl; - } -} \ No newline at end of file diff --git a/src/main/java/org/onap/dcae/commonFunction/VESLogger.java b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java deleted file mode 100644 index 2a392e81..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/VESLogger.java +++ /dev/null @@ -1,160 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.commonFunction; - -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.LoggingContextFactory; -import com.att.nsa.logging.log4j.EcompFields; -import jline.internal.Log; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.UUID; - -public class VESLogger { - - public static final String VES_AGENT = "VES_AGENT"; - public static final String REQUEST_ID = "requestId"; - private static final String IP_ADDRESS = "127.0.0.1"; - private static final String HOST_NAME = "localhost"; - - public static Logger auditLog; - public static Logger metricsLog; - public static Logger errorLog; - public static Logger debugLog; - - // Common LoggingContext - private static LoggingContext commonLC; - // Thread-specific LoggingContext - private static LoggingContext threadLC; - public LoggingContext lc; - - /** - * Returns the common LoggingContext instance that is the base context for - * all subsequent instances. - * - * @return the common LoggingContext - */ - public static LoggingContext getCommonLoggingContext() { - if (commonLC == null) { - commonLC = new LoggingContextFactory.Builder().build(); - final UUID uuid = UUID.randomUUID(); - - commonLC.put(REQUEST_ID, uuid.toString()); - } - return commonLC; - } - - /** - * Get a logging context for the current thread that's based on the common - * logging context. Populate the context with context-specific values. - * - * @param aUuid uuid for request id - * @return a LoggingContext for the current thread - */ - public static LoggingContext getLoggingContextForThread(UUID aUuid) { - // note that this operation requires everything from the common context - // to be (re)copied into the target context. That seems slow, but it - // actually - // helps prevent the thread from overwriting supposedly common data. It - // also - // should be fairly quick compared with the overhead of handling the - // actual - // service call. - - threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build(); - // Establish the request-specific UUID, as long as we are here... - threadLC.put(REQUEST_ID, aUuid.toString()); - threadLC.put(EcompFields.kEndTimestamp, SaClock.now()); - - return threadLC; - } - - /** - * Get a logging context for the current thread that's based on the common - * logging context. Populate the context with context-specific values. - * - * @param aUuid uuid for request id - * @return a LoggingContext for the current thread - */ - public static LoggingContext getLoggingContextForThread(String aUuid) { - // note that this operation requires everything from the common context - // to be (re)copied into the target context. That seems slow, but it - // actually - // helps prevent the thread from overwriting supposedly common data. It - // also - // should be fairly quick compared with the overhead of handling the - // actual - // service call. - - threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build(); - // Establish the request-specific UUID, as long as we are here... - threadLC.put(REQUEST_ID, aUuid); - threadLC.put("statusCode", "COMPLETE"); - threadLC.put(EcompFields.kEndTimestamp, SaClock.now()); - return threadLC; - } - - public static void setUpEcompLogging() { - - // Create ECOMP Logger instances - auditLog = LoggerFactory.getLogger("com.att.ecomp.audit"); - metricsLog = LoggerFactory.getLogger("com.att.ecomp.metrics"); - debugLog = LoggerFactory.getLogger("com.att.ecomp.debug"); - errorLog = LoggerFactory.getLogger("com.att.ecomp.error"); - - final LoggingContext lc = getCommonLoggingContext(); - - String ipAddr = IP_ADDRESS; - String hostname = HOST_NAME; - try { - final InetAddress ip = InetAddress.getLocalHost(); - hostname = ip.getCanonicalHostName(); - ipAddr = ip.getHostAddress(); - } catch (UnknownHostException x) { - Log.debug(x.getMessage()); - } - - lc.put("serverName", hostname); - lc.put("serviceName", "VESCollecor"); - lc.put("statusCode", "RUNNING"); - lc.put("targetEntity", "NULL"); - lc.put("targetServiceName", "NULL"); - lc.put("server", hostname); - lc.put("serverIpAddress", ipAddr); - - // instance UUID is meaningless here, so we just create a new one each - // time the - // server starts. One could argue each new instantiation of the service - // should - // have a new instance ID. - lc.put("instanceUuid", ""); - lc.put("severity", ""); - lc.put(EcompFields.kEndTimestamp, SaClock.now()); - lc.put("EndTimestamp", SaClock.now()); - lc.put("partnerName", "NA"); - } - -} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java deleted file mode 100644 index 91db5172..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java +++ /dev/null @@ -1,112 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction.event.publishing; - -import io.vavr.collection.List; -import io.vavr.collection.Map; -import io.vavr.control.Option; -import io.vavr.control.Try; -import org.onap.dcae.commonFunction.AnyNode; - -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Path; - -import org.json.JSONObject; - -import static io.vavr.API.*; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -@SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do") -public final class DMaaPConfigurationParser { - - public static Try> parseToDomainMapping(Path configLocation) { - return readFromFile(configLocation) - .flatMap(DMaaPConfigurationParser::toJSON) - .flatMap(DMaaPConfigurationParser::toConfigMap); - } - - public static Try> parseToDomainMapping(JSONObject config) { - return toJSON(config.toString()) - .flatMap(DMaaPConfigurationParser::toConfigMap); - } - - private static Try readFromFile(Path configLocation) { - return Try(() -> new String(Files.readAllBytes(configLocation))) - .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation))); - } - - private static Try toJSON(String config) { - return Try(() -> AnyNode.fromString(config)) - .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config))); - } - - private static Try> toConfigMap(AnyNode config) { - return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config)) - .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config))); - } - - private static boolean usesLegacyFormat(AnyNode dMaaPConfig) { - return dMaaPConfig.has("channels"); - } - - private static Map parseLegacyFormat(AnyNode root) { - return root.get("channels").toList().toMap( - channel -> channel.get("name").toString(), - channel -> { - String destinationsStr = channel.getAsOption("cambria.url") - .getOrElse(channel.getAsOption("cambria.hosts").get()) - .toString(); - String topic = channel.get("cambria.topic").toString(); - Option maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString); - Option maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString); - List destinations = List(destinationsStr.split(",")); - return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); - }); - } - - private static Map parseNewFormat(AnyNode root) { - return root.keys().toMap( - channelName -> channelName, - channelName -> { - AnyNode channelConfig = root.get(channelName); - Option maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString); - Option maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString); - URL topicURL = unchecked( - () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply(); - String[] pathSegments = topicURL.getPath().substring(1).split("/"); - String topic = pathSegments[1]; - String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost(); - List destinations = List(destination); - return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); - }); - } - - private static PublisherConfig buildBasedOnAuth(Option maybeUser, Option maybePassword, - String topic, List destinations) { - return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password))) - .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2)) - .getOrElse(new PublisherConfig(destinations, topic)); - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java deleted file mode 100644 index a0ee3bfb..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java +++ /dev/null @@ -1,100 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.commonFunction.event.publishing; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; -import io.vavr.collection.Map; -import io.vavr.control.Try; -import org.json.JSONObject; -import org.onap.dcae.commonFunction.VESLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -class DMaaPEventPublisher implements EventPublisher { - private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; - private static final String VES_UNIQUE_ID = "VESuniqueId"; - private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); - private final DMaaPPublishersCache publishersCache; - private final Logger outputLogger; - - DMaaPEventPublisher(DMaaPPublishersCache DMaaPPublishersCache, - Logger outputLogger) { - this.publishersCache = DMaaPPublishersCache; - this.outputLogger = outputLogger; - } - - @Override - public void sendEvent(JSONObject event, String domain) { - clearVesUniqueIdFromEvent(event); - publishersCache.getPublisher(domain) - .onEmpty(() -> - log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event))) - .forEach(publisher -> sendEvent(event, domain, publisher)); - } - - @Override - public void reconfigure(Map dMaaPConfig) { - publishersCache.reconfigure(dMaaPConfig); - } - - private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) { - Try.run(() -> uncheckedSendEvent(event, domain, publisher)) - .onFailure(exc -> closePublisher(event, domain, exc)); - } - - private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) - throws IOException { - int pendingMsgs = publisher.send("MyPartitionKey", event.toString()); - if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { - log.info("Pending messages count: " + pendingMsgs); - } - String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain); - log.info(infoMsg); - outputLogger.info(infoMsg); - } - - private void closePublisher(JSONObject event, String domain, Throwable e) { - log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", - event, domain), e); - publishersCache.closePublisherFor(domain); - } - - private void clearVesUniqueIdFromEvent(JSONObject event) { - if (event.has(VES_UNIQUE_ID)) { - String uuid = event.get(VES_UNIQUE_ID).toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - log.debug("Removing VESuniqueid object from event"); - event.remove(VES_UNIQUE_ID); - } - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java deleted file mode 100644 index 4f672715..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java +++ /dev/null @@ -1,61 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction.event.publishing; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; -import io.vavr.control.Try; - -import static io.vavr.API.Try; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -final class DMaaPPublishersBuilder { - - static Try buildPublisher(PublisherConfig config) { - return Try(() -> builder(config).build()) - .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config))); - } - - private static PublisherBuilder builder(PublisherConfig config) { - if (config.isSecured()) { - return authenticatedBuilder(config); - } else { - return unAuthenticatedBuilder(config); - } - } - - private static PublisherBuilder authenticatedBuilder(PublisherConfig config) { - return unAuthenticatedBuilder(config) - .usingHttps() - .authenticatedByHttp(config.userName().get(), config.password().get()); - } - - private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) { - return new CambriaClientBuilders.PublisherBuilder() - .usingHosts(config.destinations().mkString(",")) - .onTopic(config.topic()) - .logSendFailuresAfter(5); - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java deleted file mode 100644 index c66cee05..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java +++ /dev/null @@ -1,121 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction.event.publishing; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.google.common.cache.*; -import io.vavr.collection.Map; -import io.vavr.control.Option; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static io.vavr.API.Option; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -class DMaaPPublishersCache { - - private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class); - private final LoadingCache publishersCache; - private AtomicReference> dMaaPConfiguration; - - DMaaPPublishersCache(Map dMaaPConfiguration) { - this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); - this.publishersCache = CacheBuilder.newBuilder() - .removalListener(new OnPublisherRemovalListener()) - .build(new CambriaPublishersCacheLoader()); - } - - DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader, - OnPublisherRemovalListener onPublisherRemovalListener, - Map dMaaPConfiguration) { - this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); - this.publishersCache = CacheBuilder.newBuilder() - .removalListener(onPublisherRemovalListener) - .build(dMaaPPublishersCacheLoader); - } - - Option getPublisher(String streamID) { - try { - return Option(publishersCache.getUnchecked(streamID)); - } catch (Exception e) { - log.warn("Could not create / load Cambria Publisher for streamID", e); - return Option.none(); - } - } - - void closePublisherFor(String streamId) { - publishersCache.invalidate(streamId); - } - - synchronized void reconfigure(Map newConfig) { - Map currentConfig = dMaaPConfiguration.get(); - Map removedConfigurations = currentConfig - .filterKeys(domain -> !newConfig.containsKey(domain)); - Map changedConfigurations = newConfig - .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); - dMaaPConfiguration.set(newConfig); - removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1)); - } - - static class OnPublisherRemovalListener implements RemovalListener { - - @Override - public void onRemoval(@Nonnull RemovalNotification notification) { - CambriaBatchingPublisher publisher = notification.getValue(); - if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull - try { - int timeout = 20; - TimeUnit unit = TimeUnit.SECONDS; - java.util.List stuck = publisher.close(timeout, unit); - if (!stuck.isEmpty()) { - log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', " - + "%s messages were dropped", stuck.size(), timeout, unit)); - } - } catch (InterruptedException | IOException e) { - log.error("Could not close Cambria publisher, some messages might have been dropped", e); - Thread.currentThread().interrupt(); - } - } - } - } - - class CambriaPublishersCacheLoader extends CacheLoader { - - @Override - public CambriaBatchingPublisher load(@Nonnull String domain) { - return dMaaPConfiguration.get() - .get(domain) - .toTry(() -> new RuntimeException( - f("DMaaP configuration contains no configuration for domain: '%s'", domain))) - .flatMap(DMaaPPublishersBuilder::buildPublisher) - .get(); - } - } - -} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java deleted file mode 100644 index 9cd718f8..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java +++ /dev/null @@ -1,38 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction.event.publishing; - -import io.vavr.collection.Map; -import org.json.JSONObject; -import org.slf4j.Logger; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -public interface EventPublisher { - - static EventPublisher createPublisher(Logger outputLogger, Map dMaaPConfig) { - return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger); - } - - void sendEvent(JSONObject event, String domain); - - void reconfigure(Map dMaaPConfig); -} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java deleted file mode 100644 index f1cbb8e5..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java +++ /dev/null @@ -1,99 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction.event.publishing; - -import io.vavr.collection.List; -import io.vavr.control.Option; - -import java.util.Objects; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -public final class PublisherConfig { - - private final List destinations; - private final String topic; - private String userName; - private String password; - - PublisherConfig(List destinations, String topic) { - this.destinations = destinations; - this.topic = topic; - } - - PublisherConfig(List destinations, String topic, String userName, String password) { - this.destinations = destinations; - this.topic = topic; - this.userName = userName; - this.password = password; - } - - List destinations() { - return destinations; - } - - String topic() { - return topic; - } - - Option userName() { - return Option.of(userName); - } - - Option password() { - return Option.of(password); - } - - boolean isSecured() { - return userName().isDefined() && password().isDefined(); - } - - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - PublisherConfig that = (PublisherConfig) o; - return Objects.equals(destinations, that.destinations) && - Objects.equals(topic, that.topic) && - Objects.equals(userName, that.userName) && - Objects.equals(password, that.password); - } - - @Override - public int hashCode() { - return Objects.hash(destinations, topic, userName, password); - } - - @Override - public String toString() { - return "PublisherConfig{" + - "destinations=" + destinations + - ", topic='" + topic + '\'' + - ", userName='" + userName + '\'' + - ", password='" + password + '\'' + - '}'; - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java deleted file mode 100644 index 7d535a21..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java +++ /dev/null @@ -1,65 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction.event.publishing; - -import io.vavr.API; -import io.vavr.API.Match.Case; -import io.vavr.Function0; -import io.vavr.Function1; -import java.util.function.Consumer; -import java.util.function.Function; -import org.slf4j.Logger; - -import static io.vavr.API.$; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -public final class VavrUtils { - - private VavrUtils() { - // utils aggregator - } - - /** - * Shortcut for 'string interpolation' - */ - public static String f(String msg, Object... args) { - return String.format(msg, args); - } - - /** - * Wrap failure with a more descriptive message of what has failed and chain original cause. Used to provide a - * context for errors instead of raw exception. - */ - public static Case enhanceError(String msg) { - return API.Case($(), e -> new RuntimeException(msg, e)); - } - - public static Case enhanceError(String pattern, Object... arguments) { - return API.Case($(), e -> new RuntimeException(f(pattern, arguments), e)); - } - - public static Consumer logError(Logger withLogger) { - return e -> withLogger.error(e.getMessage(), e); - } - - -} -- cgit 1.2.3-korg