diff options
Diffstat (limited to 'src/main/java/org/onap/dcae/common')
16 files changed, 1841 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcae/common/AnyNode.java b/src/main/java/org/onap/dcae/common/AnyNode.java new file mode 100644 index 00000000..a68e6299 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/AnyNode.java @@ -0,0 +1,113 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2018 Nokia Networks Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common; + +import io.vavr.collection.List; +import io.vavr.collection.Set; +import io.vavr.control.Option; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +import java.util.stream.StreamSupport; + +import static io.vavr.API.Set; + +/** + * This class is a wrapper for 2 most used entities of org.json lib: JSONArray and JSONObject and comprises utility + * methods for fast access of json structures without need to explicitly coerce between them. While using this, bear in + * mind it does not contain exception handling - it is assumed that when using, the parsed json structure is known. + * + * @author koblosz + */ +public class AnyNode { + + private Object obj; + + private AnyNode(Object object) { + this.obj = object; + } + + public static AnyNode fromString(String content) { + return new AnyNode(new JSONObject(content)); + } + + /** + * Returns key set of underlying object. It is assumed that underlying object is of type org.json.JSONObject. + */ + public Set<String> keys() { + return Set(asJsonObject().keySet().toArray(new String[]{})); + } + + /** + * Returns value associated with specified key wrapped with AnyValue object. It is assumed that this is of type + * org.json.JSONObject. + */ + public AnyNode get(String key) { + return new AnyNode(asJsonObject().get(key)); + } + + /** + * Returns string representation of this. If it happens to have null, the value is treated as + * org.json.JSONObject.NULL and "null" string is returned then. + */ + public String toString() { + return this.obj.toString(); + } + + /** + * Returns optional of object under specified key, wrapped with AnyNode object. + * If underlying object is not of type org.json.JSONObject + * or underlying object has no given key + * or given key is null + * then Optional.empty will be returned. + */ + public Option<AnyNode> getAsOption(String key) { + try { + AnyNode value = get(key); + if (value.toString().equals("null")) { + return Option.none(); + } + return Option.some(value); + } catch (JSONException ex) { + return Option.none(); + } + } + + /** + * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that + * underlying object is of type org.json.JSONObject. + */ + public List<AnyNode> toList() { + return List.ofAll(StreamSupport.stream(((JSONArray) this.obj).spliterator(), false).map(AnyNode::new)); + } + + /** + * Checks if specified key is present in this. It is assumed that this is of type JSONObject. + */ + public boolean has(String key) { + return !getAsOption(key).isEmpty(); + } + + private JSONObject asJsonObject() { + return (JSONObject) this.obj; + } + +} diff --git a/src/main/java/org/onap/dcae/common/ConfigProcessorAdapter.java b/src/main/java/org/onap/dcae/common/ConfigProcessorAdapter.java new file mode 100644 index 00000000..0a93bb8e --- /dev/null +++ b/src/main/java/org/onap/dcae/common/ConfigProcessorAdapter.java @@ -0,0 +1,45 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.common; + +import java.lang.reflect.Method; +import org.json.JSONObject; + +class ConfigProcessorAdapter { + + private final ConfigProcessors configProcessors; + + ConfigProcessorAdapter(ConfigProcessors configProcessors) { + this.configProcessors = configProcessors; + } + + boolean isFilterMet(JSONObject parameter) { + return configProcessors.isFilterMet(parameter); + } + + void runConfigProcessorFunctionByName(String functionName, JSONObject parameter) + throws ReflectiveOperationException { + Method method = configProcessors.getClass() + .getDeclaredMethod(functionName, parameter.getClass()); + method.invoke(configProcessors, parameter); + } +} diff --git a/src/main/java/org/onap/dcae/common/ConfigProcessors.java b/src/main/java/org/onap/dcae/common/ConfigProcessors.java new file mode 100644 index 00000000..c459bb1c --- /dev/null +++ b/src/main/java/org/onap/dcae/common/ConfigProcessors.java @@ -0,0 +1,598 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.common; + +import org.json.JSONArray; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.DecimalFormat; + +public class ConfigProcessors { + + private static final Logger log = LoggerFactory.getLogger(ConfigProcessors.class); + private static final String FIELD = "field"; + private static final String OLD_FIELD = "oldField"; + private static final String FILTER = "filter"; + private static final String VALUE = "value"; + private static final String REGEX = "\\[\\]"; + private static final String OBJECT_NOT_FOUND = "ObjectNotFound"; + private static final String FILTER_NOT_MET = "Filter not met"; + private static final String MAP_TYPE = "mapType"; + private static final String COMP_FALSE = "==false"; + + private final JSONObject event; + + public ConfigProcessors(JSONObject eventJson) { + event = eventJson; + } + + public void getValue(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + + if (filter == null || isFilterMet(filter)) { + getEventObjectVal(field); + } else + log.info(FILTER_NOT_MET); + } + + + public void setValue(JSONObject jsonObject) { + final String field = jsonObject.getString(FIELD); + final String value = jsonObject.getString(VALUE); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + if (filter == null || isFilterMet(filter)) { + setEventObjectVal(field, value); + } else + log.info(FILTER_NOT_MET); + } + + + private String evaluate(String str) { + String value = str; + if (str.startsWith("$")) { + value = (String) getEventObjectVal(str.substring(1)); + + } + return value; + } + + + public void suppressEvent(JSONObject jsonObject) { + final JSONObject filter = jsonObject.optJSONObject(FILTER); + + if (filter == null || isFilterMet(filter)) { + setEventObjectVal("suppressEvent", "true"); + } else + log.info(FILTER_NOT_MET); + } + + + public void addAttribute(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + final String value = evaluate(jsonObject.getString(VALUE)); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + final String fieldType = jsonObject.optString("fieldType", "string").toLowerCase(); + + if (filter == null || isFilterMet(filter)) { + setEventObjectVal(field, value, fieldType); + } else + log.info(FILTER_NOT_MET); + } + + + public void updateAttribute(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + final String value = evaluate(jsonObject.getString(VALUE)); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + if (filter == null || isFilterMet(filter)) { + setEventObjectVal(field, value); + } else + log.info(FILTER_NOT_MET); + } + + + public void removeAttribute(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + + if (filter == null || isFilterMet(filter)) { + removeEventKey(field); + } else + log.info(FILTER_NOT_MET); + } + + + private void renameArrayInArray(JSONObject jsonObject) // map + { + log.info("renameArrayInArray"); + final String field = jsonObject.getString(FIELD); + final String oldField = jsonObject.getString(OLD_FIELD); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + + if (filter == null || isFilterMet(filter)) { + + final String[] fsplit = field.split(REGEX, field.length()); + final String[] oldfsplit = oldField.split(REGEX, oldField.length()); + + final String oldValue = getEventObjectVal(oldfsplit[0]).toString(); + if (!oldValue.equals(OBJECT_NOT_FOUND)) { + final String oldArrayName = oldfsplit[1].substring(1); + final String newArrayName = fsplit[1].substring(1); + final String value = oldValue.replaceAll(oldArrayName, newArrayName); + + log.info("oldValue ==" + oldValue); + log.info("value ==" + value); + JSONArray ja = new JSONArray(value); + removeEventKey(oldfsplit[0]); + setEventObjectVal(fsplit[0], ja); + } + } else + log.info(FILTER_NOT_MET); + } + + private void renameObject(JSONObject jsonObject) // map + { + log.info("renameArrayInArray"); + final String field = jsonObject.getString(FIELD); + final String oldField = jsonObject.getString(OLD_FIELD); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + + if (filter == null || isFilterMet(filter)) { + + final JSONObject oldValue = (JSONObject) getEventObjectVal(oldField); + if (!oldValue.toString().equals(OBJECT_NOT_FOUND)) { + setEventObjectVal(field, oldValue); + removeEventKey(oldField); + } + } else + log.info(FILTER_NOT_MET); + } + + public void map(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + final String mapType = jsonObject.optString(MAP_TYPE, ""); + if (field.contains("[]")) { + if (field.matches(".*\\[\\]\\..*\\[\\]")) + renameArrayInArray(jsonObject); + else + mapToJArray(jsonObject); + } + else if (mapType.equals("hashmapToNameValueArray")) + mapHashmapToNameValueArray(jsonObject); + else if (mapType.equals("nameValueArrayToHashmap")) + mapNameValueArrayToHashmap(jsonObject); + else if (mapType.equals("renameObject")) + renameObject(jsonObject); + + else + mapAttribute(jsonObject); + } + + private String performOperation(String operation, String value) { + log.info("performOperation"); + if ("convertMBtoKB".equals(operation)) { + float kbValue = Float.parseFloat(value) * 1024; + value = String.valueOf(kbValue); + } + return value; + } + + + public void mapAttribute(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + final String oldField = jsonObject.getString(OLD_FIELD); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + final String operation = jsonObject.optString("operation"); + String value; + if (filter == null || isFilterMet(filter)) { + + value = getEventObjectVal(oldField).toString(); + if (!value.equals(OBJECT_NOT_FOUND)) { + if (operation != null && !operation.isEmpty()) + value = performOperation(operation, value); + + setEventObjectVal(field, value); + + removeEventKey(oldField); + } + } else + log.info(FILTER_NOT_MET); + } + + + private void mapToJArray(JSONObject jsonObject) { + log.info("mapToJArray"); + String field = jsonObject.getString(FIELD); + String oldField = jsonObject.getString(OLD_FIELD); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + final JSONObject attrMap = jsonObject.optJSONObject("attrMap"); + oldField = oldField.replaceAll(REGEX, ""); + field = field.replaceAll(REGEX, ""); + + if (filter == null || isFilterMet(filter)) { + + String value = getEventObjectVal(oldField).toString(); + if (!value.equals(OBJECT_NOT_FOUND)) { + log.info("old value ==" + value); + // update old value based on attrMap + if (attrMap != null) { + // loop thru attrMap and update attribute name to new name + for (String key : attrMap.keySet()) { + value = value.replaceAll(key, attrMap.getString(key)); + } + } + + log.info("new value ==" + value); + char c = value.charAt(0); + if (c != '[') { + // oldfield is JsonObject + JSONObject valueJO = new JSONObject(value); + // if the array already exists + String existingValue = getEventObjectVal(field).toString(); + if (!existingValue.equals(OBJECT_NOT_FOUND)) { + JSONArray ja = new JSONArray(existingValue); + JSONObject jo = ja.optJSONObject(0); + if (jo != null) { + for (String key : valueJO.keySet()) { + jo.put(key, valueJO.get(key)); + + } + ja.put(0, jo); + + setEventObjectVal(field, ja); + } + } else // if new array + setEventObjectVal(field + "[0]", new JSONObject(value), "JArray"); + } else // oldfield is jsonArray + setEventObjectVal(field, new JSONArray(value)); + + removeEventKey(oldField); + } + } else + log.info(FILTER_NOT_MET); + } + + // this method is to support the mapping 5.x to VES7.x format for additionalInformation field + private void mapNameValueArrayToHashmap(JSONObject jsonObject) { + log.info("mapNameValueArrayToHashmap"); + String field = jsonObject.getString(FIELD); + String oldField = jsonObject.getString(FIELD); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + + if (filter == null || isFilterMet(filter)) { + JSONObject newHashMap = new JSONObject(); // this will hold the newly mapped hashmap elements + JSONArray arrayValue = (JSONArray) getEventObjectVal(oldField); // old Array structure value + JSONObject tempJObj = null; + String tempName = ""; + String tempValue = ""; + if (!arrayValue.toString().equals(OBJECT_NOT_FOUND)) { + log.info("old value ==" + arrayValue.toString()); + // Loop thru the JSONArray, get the name:value pair and write to new JSONObject as hashmap elements + for (int i = 0; i < arrayValue.length(); i++) { + + tempJObj = arrayValue.getJSONObject(i); + if (tempJObj != null) { + tempName = tempJObj.get("name").toString(); + tempValue = tempJObj.get(VALUE).toString(); + newHashMap.put(tempName, tempValue); + } + } + // remove the old Array structure + removeEventKey(oldField); + //Add the new Hashmap + setEventObjectVal(field, newHashMap); + } + } else + log.info(FILTER_NOT_MET); + } + + // this method is to support the mapping 7.x to VES5.x format for additionalInformation field + private void mapHashmapToNameValueArray(JSONObject jsonObject) { + log.info("mapHashmapToNameValueArray"); + System.out.println("mapHashmapToNameValueArray"); + String field = jsonObject.getString(FIELD); + String oldField = jsonObject.getString(FIELD); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + + if (filter == null || isFilterMet(filter)) { + JSONArray newArray = new JSONArray(); // this will hold the new name:value JSONObject + JSONObject nameValJObj; + System.out.println("object ==" + getEventObjectVal(oldField).toString()); + if (!getEventObjectVal(oldField).toString().equals(OBJECT_NOT_FOUND)) { + + JSONObject hashMap = (JSONObject) getEventObjectVal(oldField); // old hashmap structure value + if (hashMap != null) { + log.info("old value ==" + hashMap.toString()); + // Loop thru the hashMap JSONObject, get the hashmap elements add them as name:value JsonObject into the newArray + for (String key : hashMap.keySet()) { + nameValJObj = new JSONObject(); //create new object so not to overwrite in memory for Array insertion + nameValJObj.put("name", key); + nameValJObj.put("value", hashMap.get(key)); + newArray.put(nameValJObj); + } + // remove the old hashMap structure + removeEventKey(oldField); + //Add the newArray containing the name:value Object + setEventObjectVal(field, newArray); + } + } + } else + log.info(FILTER_NOT_MET); + } + + /** + * example - { "functionName": "concatenateValue", "args":{ "filter": + * {"event.commonEventHeader.event":"heartbeat"}, + * FIELD:"event.commonEventHeader.eventName", "concatenate": + * ["event.commonEventHeader.domain","event.commonEventHeader.eventType","event.commonEventHeader.alarmCondition"], + * "delimiter":"_" } } + **/ + public void concatenateValue(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + final String delimiter = jsonObject.getString("delimiter"); + final JSONArray values = jsonObject.getJSONArray("concatenate"); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + if (filter == null || isFilterMet(filter)) { + StringBuilder value = new StringBuilder(); + for (int i = 0; i < values.length(); i++) { + + String tempVal = evaluate(values.getString(i)); + if (!tempVal.equals(OBJECT_NOT_FOUND)) { + if (i == 0) + value.append(tempVal); + else + value.append(delimiter).append(tempVal); + } + } + + setEventObjectVal(field, value.toString()); + } else + log.info(FILTER_NOT_MET); + } + + public void subtractValue(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + final JSONArray values = jsonObject.getJSONArray("subtract"); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + if (filter == null || isFilterMet(filter)) { + float value = 0; + for (int i = 0; i < values.length(); i++) { + log.info(values.getString(i)); + String tempVal = evaluate(values.getString(i)); + log.info("tempVal==" + tempVal); + if (!tempVal.equals(OBJECT_NOT_FOUND)) { + if (i == 0) + value = value + Float.valueOf(tempVal); + else + value = value - Float.valueOf(tempVal); + } + } + log.info("value ==" + value); + setEventObjectVal(field, value, "number"); + } else + log.info(FILTER_NOT_MET); + } + + + private void removeEventKey(String field) { + String[] keySet = field.split("\\.", field.length()); + JSONObject keySeries = event; + for (int i = 0; i < (keySet.length - 1); i++) { + + keySeries = keySeries.getJSONObject(keySet[i]); + } + + keySeries.remove(keySet[keySet.length - 1]); + } + + + private boolean checkFilter(JSONObject jo, String key, String logicKey) { + String filterValue = jo.getString(key); + if (filterValue.contains(":")) { + String[] splitVal = filterValue.split(":"); + if ("matches".equals(splitVal[0])) { + if ("not".equals(logicKey)) { + if (getEventObjectVal(key).toString().matches(splitVal[1])) { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); + return false; + } + } else { + if (!(getEventObjectVal(key).toString().matches(splitVal[1]))) { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); + return false; + } + } + + } + if ("contains".equals(splitVal[0])) { + if ("not".equals(logicKey)) { + if (getEventObjectVal(key).toString().contains(splitVal[1])) { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); + return false; + } + } else { + if (!(getEventObjectVal(key).toString().contains(splitVal[1]))) { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); + return false; + } + } + + } + } else { + if ("not".equals(logicKey)) { + if (getEventObjectVal(key).toString().equals(filterValue)) { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); + return false; + } + } else { + if (!(getEventObjectVal(key).toString().equals(filterValue))) { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); + return false; + } + } + } + return true; + } + + + public boolean isFilterMet(JSONObject jo) { + for (String key : jo.keySet()) { + if ("not".equals(key)) { + JSONObject njo = jo.getJSONObject(key); + for (String njoKey : njo.keySet()) { + if (!checkFilter(njo, njoKey, key)) + return false; + } + } else { + if (!checkFilter(jo, key, key)) + return false; + } + } + return true; + } + + /** + * returns a string or JSONObject or JSONArray + **/ + public Object getEventObjectVal(String keySeriesStr) { + keySeriesStr = keySeriesStr.replaceAll("\\[", "."); + keySeriesStr = keySeriesStr.replaceAll("\\]", "."); + if (keySeriesStr.contains("..")) { + keySeriesStr = keySeriesStr.replaceAll("\\.\\.", "."); + } + + if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() - 1) + keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1); + String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length()); + Object keySeriesObj = event; + for (String aKeySet : keySet) { + if (keySeriesObj != null) { + if (keySeriesObj instanceof String) { + + log.info("STRING==" + keySeriesObj); + } else if (keySeriesObj instanceof JSONArray) { + keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(aKeySet)); + + } else if (keySeriesObj instanceof JSONObject) { + keySeriesObj = ((JSONObject) keySeriesObj).opt(aKeySet); + + } else { + log.info("unknown object==" + keySeriesObj); + } + } + } + + if (keySeriesObj == null) + return OBJECT_NOT_FOUND; + return keySeriesObj; + } + + public void setEventObjectVal(String keySeriesStr, Object value) { + setEventObjectVal(keySeriesStr, value, "string"); + } + + /** + * returns a string or JSONObject or JSONArray + **/ + public void setEventObjectVal(String keySeriesStr, Object value, String fieldType) { + keySeriesStr = keySeriesStr.replaceAll("\\[", "."); + keySeriesStr = keySeriesStr.replaceAll("\\]", "."); + if (keySeriesStr.contains("..")) { + keySeriesStr = keySeriesStr.replaceAll("\\.\\.", "."); + } + log.info("fieldType==" + fieldType); + + if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() - 1) + keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1); + String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length()); + Object keySeriesObj = event; + for (int i = 0; i < (keySet.length - 1); i++) { + + if (keySeriesObj instanceof JSONArray) { + + if (((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])) == null) // if + // the + // object + // is + // not + // there + // then + // add + // it + { + log.info("Object is null, must add it"); + if (keySet[i + 1].matches("[0-9]*")) // if index then array + ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONArray()); + else + ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONObject()); + } + keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])); + + } else if (keySeriesObj instanceof JSONObject) { + if (((JSONObject) keySeriesObj).opt(keySet[i]) == null) // if + // the + // object + // is + // not + // there + // then + // add + // it + { + if (keySet[i + 1].matches("[0-9]*")) // if index then array + ((JSONObject) keySeriesObj).put(keySet[i], new JSONArray()); + else + ((JSONObject) keySeriesObj).put(keySet[i], new JSONObject()); + log.info("Object is null, must add it"); + } + keySeriesObj = ((JSONObject) keySeriesObj).opt(keySet[i]); + } else { + log.info("unknown object==" + keySeriesObj); + } + } + if ("number".equals(fieldType)) { + DecimalFormat df = new DecimalFormat("#.0"); + if (value instanceof String) + ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], + Float.valueOf(df.format(Float.valueOf((String) value)))); + else + ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], Float.valueOf(df.format(value))); + } else if ("integer".equals(fieldType) && value instanceof String) + ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], Integer.valueOf((String) value)); + else if ("JArray".equals(fieldType)) + ((JSONArray) keySeriesObj).put(value); + else + ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], value); + + } +} diff --git a/src/main/java/org/onap/dcae/common/Event.java b/src/main/java/org/onap/dcae/common/Event.java new file mode 100644 index 00000000..1fa8179e --- /dev/null +++ b/src/main/java/org/onap/dcae/common/Event.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common; + +import com.google.gson.JsonObject; + +import java.util.List; + +class Event { + final JsonObject filter; + final List<Processor> processors; + + Event(JsonObject filter, List<Processor> processors) { + this.filter = filter; + this.processors = processors; + } +} diff --git a/src/main/java/org/onap/dcae/common/EventProcessor.java b/src/main/java/org/onap/dcae/common/EventProcessor.java new file mode 100644 index 00000000..bf3bf70d --- /dev/null +++ b/src/main/java/org/onap/dcae/common/EventProcessor.java @@ -0,0 +1,61 @@ +/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
+import org.json.JSONObject;
+import org.onap.dcae.VesApplication;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventProcessor implements Runnable {
+
+ private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
+ private EventSender eventSender;
+
+ public EventProcessor(EventSender eventSender) {
+ this.eventSender = eventSender;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true){
+ JSONObject event = VesApplication.fProcessingInputQueue.take();
+ log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
+ setLoggingContext(event);
+ log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + eventSender.getDomain(event));
+ eventSender.send(event);
+ log.debug("Message published" + event);
+ }
+ } catch (InterruptedException e) {
+ log.error("EventProcessor InterruptedException" + e.getMessage());
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void setLoggingContext(JSONObject event) {
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get("VESuniqueId").toString());
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+ }
+}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java new file mode 100644 index 00000000..3c95315c --- /dev/null +++ b/src/main/java/org/onap/dcae/common/EventSender.java @@ -0,0 +1,122 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved.s + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import io.vavr.collection.Map; +import java.io.FileReader; +import java.io.IOException; +import java.lang.reflect.Type; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import org.json.JSONObject; +import org.onap.dcae.ApplicationSettings; +import org.onap.dcae.common.publishing.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventSender { + + private Map<String, String[]> streamidHash; + private ApplicationSettings properties; + private EventPublisher eventPublisher; + + static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType(); + private static final Logger log = LoggerFactory.getLogger(EventSender.class); + private static final String EVENT_LITERAL = "event"; + private static final String COMMON_EVENT_HEADER = "commonEventHeader"; + private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); + + public EventSender( EventPublisher eventPublisher, ApplicationSettings properties) { + this.eventPublisher = eventPublisher; + this.streamidHash = properties.dMaaPStreamsMapping(); + this.properties = properties; + + } + + public void send(JSONObject event) { + streamidHash.get(getDomain(event)) + .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + event)) + .forEach(streamIds -> sendEventsToStreams(event, streamIds)); + } + + public static String getDomain(JSONObject event) { + return event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain"); + } + + private void sendEventsToStreams(JSONObject event, String[] streamIdList) { + for (String aStreamIdList : streamIdList) { + log.info("Invoking publisher for streamId:" + aStreamIdList); + eventPublisher.sendEvent(overrideEvent(event), aStreamIdList); + } + } + + private JSONObject overrideEvent(JSONObject event) { + JSONObject jsonObject = addCurrentTimeToEvent(event); + if (properties.eventTransformingEnabled()) { + try (FileReader fr = new FileReader("./etc/eventTransform.json")) { + log.info("parse eventTransform.json"); + List<Event> events = new Gson().fromJson(fr, EVENT_LIST_TYPE); + parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject))); + } catch (IOException e) { + log.error("Couldn't find file ./etc/eventTransform.json" + e.toString()); + } + } + if (jsonObject.has("VESversion")) + jsonObject.remove("VESversion"); + + log.debug("Modified event:" + jsonObject); + return jsonObject; + } + + private JSONObject addCurrentTimeToEvent(JSONObject event) { + final Date currentTime = new Date(); + JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime)); + JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); + commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); + event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); + return event; + } + + private void parseEventsJson(List<Event> eventsTransform, ConfigProcessorAdapter configProcessorAdapter) { + for (Event eventTransform : eventsTransform) { + JSONObject filterObj = new JSONObject(eventTransform.filter.toString()); + if (configProcessorAdapter.isFilterMet(filterObj)) { + callProcessorsMethod(configProcessorAdapter, eventTransform.processors); + } + } + } + + private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List<Processor> processors) { + for (Processor processor : processors) { + final String functionName = processor.functionName; + final JSONObject args = new JSONObject(processor.args.toString()); + log.info(String.format("functionName==%s | args==%s", functionName, args)); + try { + configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args); + } catch (ReflectiveOperationException e) { + log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); + } + } + } +} diff --git a/src/main/java/org/onap/dcae/common/Processor.java b/src/main/java/org/onap/dcae/common/Processor.java new file mode 100644 index 00000000..20ef4da9 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/Processor.java @@ -0,0 +1,33 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.common; + +import com.google.gson.JsonObject; + +class Processor { + final String functionName; + final JsonObject args; + + Processor(String functionName, JsonObject args) { + this.functionName = functionName; + this.args = args; + } +} diff --git a/src/main/java/org/onap/dcae/common/SSLContextCreator.java b/src/main/java/org/onap/dcae/common/SSLContextCreator.java new file mode 100644 index 00000000..a76c7cbe --- /dev/null +++ b/src/main/java/org/onap/dcae/common/SSLContextCreator.java @@ -0,0 +1,82 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.common; + +import org.springframework.boot.web.server.Ssl; + +import java.nio.file.Path; + +public class SSLContextCreator { + private final String keyStorePassword; + private final String certAlias; + private final Path keyStoreFile; + + private Path trustStoreFile; + private String trustStorePassword; + private boolean hasTlsClientAuthentication = false; + + public static SSLContextCreator create(final Path keyStoreFile, final String certAlias, final String password) { + return new SSLContextCreator(keyStoreFile, certAlias, password); + } + + private SSLContextCreator(final Path keyStoreFile, final String certAlias, final String password) { + this.certAlias = certAlias; + this.keyStoreFile = keyStoreFile; + this.keyStorePassword = password; + } + + public SSLContextCreator withTlsClientAuthentication(final Path trustStoreFile, final String password) { + hasTlsClientAuthentication = true; + this.trustStoreFile = trustStoreFile; + this.trustStorePassword = password; + + return this; + } + + private void configureKeyStore(final Ssl ssl) { + final String keyStore = keyStoreFile.toAbsolutePath().toString(); + + ssl.setKeyStore(keyStore); + ssl.setKeyPassword(keyStorePassword); + ssl.setKeyAlias(certAlias); + } + + private void configureTrustStore(final Ssl ssl) { + final String trustStore = trustStoreFile.toAbsolutePath().toString(); + + ssl.setTrustStore(trustStore); + ssl.setTrustStorePassword(trustStorePassword); + ssl.setClientAuth(Ssl.ClientAuth.NEED); + } + + public Ssl build() { + final Ssl ssl = new Ssl(); + ssl.setEnabled(true); + + configureKeyStore(ssl); + + if (hasTlsClientAuthentication) { + configureTrustStore(ssl); + } + + return ssl; + } +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/common/VESLogger.java b/src/main/java/org/onap/dcae/common/VESLogger.java new file mode 100644 index 00000000..c7354502 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/VESLogger.java @@ -0,0 +1,160 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.common; + +import com.att.nsa.clock.SaClock; +import com.att.nsa.logging.LoggingContext; +import com.att.nsa.logging.LoggingContextFactory; +import com.att.nsa.logging.log4j.EcompFields; +import jline.internal.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.UUID; + +public class VESLogger { + + public static final String VES_AGENT = "VES_AGENT"; + public static final String REQUEST_ID = "requestId"; + private static final String IP_ADDRESS = "127.0.0.1"; + private static final String HOST_NAME = "localhost"; + + public static Logger auditLog; + public static Logger metricsLog; + public static Logger errorLog; + public static Logger debugLog; + + // Common LoggingContext + private static LoggingContext commonLC; + // Thread-specific LoggingContext + private static LoggingContext threadLC; + public LoggingContext lc; + + /** + * Returns the common LoggingContext instance that is the base context for + * all subsequent instances. + * + * @return the common LoggingContext + */ + public static LoggingContext getCommonLoggingContext() { + if (commonLC == null) { + commonLC = new LoggingContextFactory.Builder().build(); + final UUID uuid = UUID.randomUUID(); + + commonLC.put(REQUEST_ID, uuid.toString()); + } + return commonLC; + } + + /** + * Get a logging context for the current thread that's based on the common + * logging context. Populate the context with context-specific values. + * + * @param aUuid uuid for request id + * @return a LoggingContext for the current thread + */ + public static LoggingContext getLoggingContextForThread(UUID aUuid) { + // note that this operation requires everything from the common context + // to be (re)copied into the target context. That seems slow, but it + // actually + // helps prevent the thread from overwriting supposedly common data. It + // also + // should be fairly quick compared with the overhead of handling the + // actual + // service call. + + threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build(); + // Establish the request-specific UUID, as long as we are here... + threadLC.put(REQUEST_ID, aUuid.toString()); + threadLC.put(EcompFields.kEndTimestamp, SaClock.now()); + + return threadLC; + } + + /** + * Get a logging context for the current thread that's based on the common + * logging context. Populate the context with context-specific values. + * + * @param aUuid uuid for request id + * @return a LoggingContext for the current thread + */ + public static LoggingContext getLoggingContextForThread(String aUuid) { + // note that this operation requires everything from the common context + // to be (re)copied into the target context. That seems slow, but it + // actually + // helps prevent the thread from overwriting supposedly common data. It + // also + // should be fairly quick compared with the overhead of handling the + // actual + // service call. + + threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build(); + // Establish the request-specific UUID, as long as we are here... + threadLC.put(REQUEST_ID, aUuid); + threadLC.put("statusCode", "COMPLETE"); + threadLC.put(EcompFields.kEndTimestamp, SaClock.now()); + return threadLC; + } + + public static void setUpEcompLogging() { + + // Create ECOMP Logger instances + auditLog = LoggerFactory.getLogger("com.att.ecomp.audit"); + metricsLog = LoggerFactory.getLogger("com.att.ecomp.metrics"); + debugLog = LoggerFactory.getLogger("com.att.ecomp.debug"); + errorLog = LoggerFactory.getLogger("com.att.ecomp.error"); + + final LoggingContext lc = getCommonLoggingContext(); + + String ipAddr = IP_ADDRESS; + String hostname = HOST_NAME; + try { + final InetAddress ip = InetAddress.getLocalHost(); + hostname = ip.getCanonicalHostName(); + ipAddr = ip.getHostAddress(); + } catch (UnknownHostException x) { + Log.debug(x.getMessage()); + } + + lc.put("serverName", hostname); + lc.put("serviceName", "VESCollecor"); + lc.put("statusCode", "RUNNING"); + lc.put("targetEntity", "NULL"); + lc.put("targetServiceName", "NULL"); + lc.put("server", hostname); + lc.put("serverIpAddress", ipAddr); + + // instance UUID is meaningless here, so we just create a new one each + // time the + // server starts. One could argue each new instantiation of the service + // should + // have a new instance ID. + lc.put("instanceUuid", ""); + lc.put("severity", ""); + lc.put(EcompFields.kEndTimestamp, SaClock.now()); + lc.put("EndTimestamp", SaClock.now()); + lc.put("partnerName", "NA"); + } + +} diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java new file mode 100644 index 00000000..274e4490 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java @@ -0,0 +1,112 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import io.vavr.collection.List; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import io.vavr.control.Try; +import org.onap.dcae.common.AnyNode; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.json.JSONObject; + +import static io.vavr.API.*; +import static org.onap.dcae.common.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.common.publishing.VavrUtils.f; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +@SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do") +public final class DMaaPConfigurationParser { + + public static Try<Map<String, PublisherConfig>> parseToDomainMapping(Path configLocation) { + return readFromFile(configLocation) + .flatMap(DMaaPConfigurationParser::toJSON) + .flatMap(DMaaPConfigurationParser::toConfigMap); + } + + public static Try<Map<String, PublisherConfig>> parseToDomainMapping(JSONObject config) { + return toJSON(config.toString()) + .flatMap(DMaaPConfigurationParser::toConfigMap); + } + + private static Try<String> readFromFile(Path configLocation) { + return Try(() -> new String(Files.readAllBytes(configLocation))) + .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation))); + } + + private static Try<AnyNode> toJSON(String config) { + return Try(() -> AnyNode.fromString(config)) + .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config))); + } + + private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) { + return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config)) + .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config))); + } + + private static boolean usesLegacyFormat(AnyNode dMaaPConfig) { + return dMaaPConfig.has("channels"); + } + + private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) { + return root.get("channels").toList().toMap( + channel -> channel.get("name").toString(), + channel -> { + String destinationsStr = channel.getAsOption("cambria.url") + .getOrElse(channel.getAsOption("cambria.hosts").get()) + .toString(); + String topic = channel.get("cambria.topic").toString(); + Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString); + Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString); + List<String> destinations = List(destinationsStr.split(",")); + return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); + }); + } + + private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) { + return root.keys().toMap( + channelName -> channelName, + channelName -> { + AnyNode channelConfig = root.get(channelName); + Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString); + Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString); + URL topicURL = unchecked( + () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply(); + String[] pathSegments = topicURL.getPath().substring(1).split("/"); + String topic = pathSegments[1]; + String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost(); + List<String> destinations = List(destination); + return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); + }); + } + + private static PublisherConfig buildBasedOnAuth(Option<String> maybeUser, Option<String> maybePassword, + String topic, List<String> destinations) { + return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password))) + .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2)) + .getOrElse(new PublisherConfig(destinations, topic)); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java new file mode 100644 index 00000000..aa3dc7a3 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -0,0 +1,100 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.common.publishing; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.clock.SaClock; +import com.att.nsa.logging.LoggingContext; +import com.att.nsa.logging.log4j.EcompFields; +import io.vavr.collection.Map; +import io.vavr.control.Try; +import org.json.JSONObject; +import org.onap.dcae.common.VESLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static org.onap.dcae.common.publishing.VavrUtils.f; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +class DMaaPEventPublisher implements EventPublisher { + private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; + private static final String VES_UNIQUE_ID = "VESuniqueId"; + private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); + private final DMaaPPublishersCache publishersCache; + private final Logger outputLogger; + + DMaaPEventPublisher(DMaaPPublishersCache DMaaPPublishersCache, + Logger outputLogger) { + this.publishersCache = DMaaPPublishersCache; + this.outputLogger = outputLogger; + } + + @Override + public void sendEvent(JSONObject event, String domain) { + clearVesUniqueIdFromEvent(event); + publishersCache.getPublisher(domain) + .onEmpty(() -> + log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event))) + .forEach(publisher -> sendEvent(event, domain, publisher)); + } + + @Override + public void reconfigure(Map<String, PublisherConfig> dMaaPConfig) { + publishersCache.reconfigure(dMaaPConfig); + } + + private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) { + Try.run(() -> uncheckedSendEvent(event, domain, publisher)) + .onFailure(exc -> closePublisher(event, domain, exc)); + } + + private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) + throws IOException { + int pendingMsgs = publisher.send("MyPartitionKey", event.toString()); + if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { + log.info("Pending messages count: " + pendingMsgs); + } + String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain); + log.info(infoMsg); + outputLogger.info(infoMsg); + } + + private void closePublisher(JSONObject event, String domain, Throwable e) { + log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", + event, domain), e); + publishersCache.closePublisherFor(domain); + } + + private void clearVesUniqueIdFromEvent(JSONObject event) { + if (event.has(VES_UNIQUE_ID)) { + String uuid = event.get(VES_UNIQUE_ID).toString(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + log.debug("Removing VESuniqueid object from event"); + event.remove(VES_UNIQUE_ID); + } + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java new file mode 100644 index 00000000..a93073bf --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java @@ -0,0 +1,61 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; +import io.vavr.control.Try; + +import static io.vavr.API.Try; +import static org.onap.dcae.common.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.common.publishing.VavrUtils.f; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +final class DMaaPPublishersBuilder { + + static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) { + return Try(() -> builder(config).build()) + .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config))); + } + + private static PublisherBuilder builder(PublisherConfig config) { + if (config.isSecured()) { + return authenticatedBuilder(config); + } else { + return unAuthenticatedBuilder(config); + } + } + + private static PublisherBuilder authenticatedBuilder(PublisherConfig config) { + return unAuthenticatedBuilder(config) + .usingHttps() + .authenticatedByHttp(config.userName().get(), config.password().get()); + } + + private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) { + return new CambriaClientBuilders.PublisherBuilder() + .usingHosts(config.destinations().mkString(",")) + .onTopic(config.topic()) + .logSendFailuresAfter(5); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java new file mode 100644 index 00000000..b7997ef9 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java @@ -0,0 +1,121 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.google.common.cache.*; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static io.vavr.API.Option; +import static org.onap.dcae.common.publishing.VavrUtils.f; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +class DMaaPPublishersCache { + + private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class); + private final LoadingCache<String, CambriaBatchingPublisher> publishersCache; + private AtomicReference<Map<String, PublisherConfig>> dMaaPConfiguration; + + DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) { + this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); + this.publishersCache = CacheBuilder.newBuilder() + .removalListener(new OnPublisherRemovalListener()) + .build(new CambriaPublishersCacheLoader()); + } + + DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader, + OnPublisherRemovalListener onPublisherRemovalListener, + Map<String, PublisherConfig> dMaaPConfiguration) { + this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); + this.publishersCache = CacheBuilder.newBuilder() + .removalListener(onPublisherRemovalListener) + .build(dMaaPPublishersCacheLoader); + } + + Option<CambriaBatchingPublisher> getPublisher(String streamID) { + try { + return Option(publishersCache.getUnchecked(streamID)); + } catch (Exception e) { + log.warn("Could not create / load Cambria Publisher for streamID", e); + return Option.none(); + } + } + + void closePublisherFor(String streamId) { + publishersCache.invalidate(streamId); + } + + synchronized void reconfigure(Map<String, PublisherConfig> newConfig) { + Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get(); + Map<String, PublisherConfig> removedConfigurations = currentConfig + .filterKeys(domain -> !newConfig.containsKey(domain)); + Map<String, PublisherConfig> changedConfigurations = newConfig + .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); + dMaaPConfiguration.set(newConfig); + removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1)); + } + + static class OnPublisherRemovalListener implements RemovalListener<String, CambriaBatchingPublisher> { + + @Override + public void onRemoval(@Nonnull RemovalNotification<String, CambriaBatchingPublisher> notification) { + CambriaBatchingPublisher publisher = notification.getValue(); + if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull + try { + int timeout = 20; + TimeUnit unit = TimeUnit.SECONDS; + java.util.List<?> stuck = publisher.close(timeout, unit); + if (!stuck.isEmpty()) { + log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', " + + "%s messages were dropped", stuck.size(), timeout, unit)); + } + } catch (InterruptedException | IOException e) { + log.error("Could not close Cambria publisher, some messages might have been dropped", e); + Thread.currentThread().interrupt(); + } + } + } + } + + class CambriaPublishersCacheLoader extends CacheLoader<String, CambriaBatchingPublisher> { + + @Override + public CambriaBatchingPublisher load(@Nonnull String domain) { + return dMaaPConfiguration.get() + .get(domain) + .toTry(() -> new RuntimeException( + f("DMaaP configuration contains no configuration for domain: '%s'", domain))) + .flatMap(DMaaPPublishersBuilder::buildPublisher) + .get(); + } + } + +} diff --git a/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java new file mode 100644 index 00000000..42e721a8 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import io.vavr.collection.Map; +import org.json.JSONObject; +import org.slf4j.Logger; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +public interface EventPublisher { + + static EventPublisher createPublisher(Logger outputLogger, Map<String, PublisherConfig> dMaaPConfig) { + return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger); + } + + void sendEvent(JSONObject event, String domain); + + void reconfigure(Map<String, PublisherConfig> dMaaPConfig); +} diff --git a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java new file mode 100644 index 00000000..1fd0d316 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java @@ -0,0 +1,99 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import io.vavr.collection.List; +import io.vavr.control.Option; + +import java.util.Objects; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +public final class PublisherConfig { + + private final List<String> destinations; + private final String topic; + private String userName; + private String password; + + PublisherConfig(List<String> destinations, String topic) { + this.destinations = destinations; + this.topic = topic; + } + + PublisherConfig(List<String> destinations, String topic, String userName, String password) { + this.destinations = destinations; + this.topic = topic; + this.userName = userName; + this.password = password; + } + + List<String> destinations() { + return destinations; + } + + String topic() { + return topic; + } + + Option<String> userName() { + return Option.of(userName); + } + + Option<String> password() { + return Option.of(password); + } + + boolean isSecured() { + return userName().isDefined() && password().isDefined(); + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PublisherConfig that = (PublisherConfig) o; + return Objects.equals(destinations, that.destinations) && + Objects.equals(topic, that.topic) && + Objects.equals(userName, that.userName) && + Objects.equals(password, that.password); + } + + @Override + public int hashCode() { + return Objects.hash(destinations, topic, userName, password); + } + + @Override + public String toString() { + return "PublisherConfig{" + + "destinations=" + destinations + + ", topic='" + topic + '\'' + + ", userName='" + userName + '\'' + + ", password='" + password + '\'' + + '}'; + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/VavrUtils.java b/src/main/java/org/onap/dcae/common/publishing/VavrUtils.java new file mode 100644 index 00000000..e1eae247 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/VavrUtils.java @@ -0,0 +1,62 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import io.vavr.API; +import io.vavr.API.Match.Case; +import java.util.function.Consumer; +import org.slf4j.Logger; + +import static io.vavr.API.$; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +public final class VavrUtils { + + private VavrUtils() { + // utils aggregator + } + + /** + * Shortcut for 'string interpolation' + */ + public static String f(String msg, Object... args) { + return String.format(msg, args); + } + + /** + * Wrap failure with a more descriptive message of what has failed and chain original cause. Used to provide a + * context for errors instead of raw exception. + */ + public static Case<Throwable, Throwable> enhanceError(String msg) { + return API.Case($(), e -> new RuntimeException(msg, e)); + } + + public static Case<Throwable, Throwable> enhanceError(String pattern, Object... arguments) { + return API.Case($(), e -> new RuntimeException(f(pattern, arguments), e)); + } + + public static Consumer<Throwable> logError(Logger withLogger) { + return e -> withLogger.error(e.getMessage(), e); + } + + +} |