diff options
-rw-r--r-- | README.md | 165 | ||||
-rw-r--r-- | src/main/java/org/onap/dcae/commonFunction/CommonStartup.java | 20 | ||||
-rw-r--r-- | src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java | 68 |
3 files changed, 124 insertions, 129 deletions
@@ -3,107 +3,102 @@ DCAE VESCollector This is the repository for VES Collector for Open DCAE. +Virtual Event Streaming (VES) Collector is RESTful collector for processing JSON messages into DCAE. The collector verifies the source (when authentication is enabled) and validates the events against VES schema before distributing to DMAAP MR topics for downstream system to subscribe. The VESCollector also provides configurable event transformation function and event distribution to DMAAP MR topics. + +The collector supports individual events or eventbatch posted to collector end-point(s) and post them to interface/bus for other application to subscribe. + + ### Build Instructions -This project is organized as a mvn project for a jar package. +This project is organized as a mvn project and has "org.onap.dcaegen2" as parent project. The build generate a jar and package into docker container. ``` -git clone ssh://git@<repo-address>:dcae-collectors/OpenVESCollector.git +git clone ssh://vv770d@gerrit.onap.org:29418/dcaegen2/collectors/ves mvn clean install ``` ### Docker Image -The jar file is bundled into a docker image installed by the DCAE Controller. Following is the process to creating the image -#### Set up the packaging environment -1. Extract the VESCollector code and do mvn build ``` -$ git clone ssh://git@<repo-address>:dcae-collectors/OpenVESCollector.git +git clone ssh://vv770d@gerrit.onap.org:29418/dcaegen2/collectors/ves +mvn clean deploy ``` -2. Once the collector build is successful build dcae-controller -``` -BASE_WS="/var/lib/jenkins/workspace" -PROJECT="build-dcae-controller" -DCM_DIR="dcae-org.onap.dcae.controller/dcae-controller-service-standardeventcollector-manager/target/" -ARTIFACT="dcae-controller-service-standardeventcollector-manager-0.1.0-SNAPSHOT-runtime.zip" -DCM_AR="${BASE_WS}/${PROJECT}/${DCM_DIR}/${ARTIFACT}" -echo "WORKSPACE: ${WORKSPACE}" -if [ ! -f "${DCM_AR}" ] -then - echo "FATAL error cannot locate ${DCM_AR}" - exit 2 -fi -TARGET=${WORKSPACE}/target -STAGE=${TARGET}/stage -DCM_DIR=${STAGE}/opt/app/manager -[ ! -d ${DCM_DIR} ] && mkdir -p ${DCM_DIR} -unzip -qo -d ${DCM_DIR} ${DCM_AR} -``` -3. Get the VES collector Service manager artifacts. +For R1 - image/version pushed to nexus3 ``` -DCM_DIR=${WORKSPACE}/target/stage/opt/app/manager -[ -f "${DCM_DIR}/start-manager.sh" ] && exit 0 -cat <<'EOF' > ${DCM_DIR}/start-manager.sh -#!/bin/bash -MAIN=org.openecomp.dcae.controller.service.standardeventcollector.servers.manager.DcaeControllerServiceStandardeventcollectorManagerServer -ACTION=start -WORKDIR=/opt/app/manager -LOGS="${WORKDIR}/logs" -[ ! -d $LOGS ] && mkdir -p $LOGS -echo 10.0.4.102 $(hostname).dcae.simpledemo.openecomp.org >> /etc/hosts -exec java -cp ./config:./lib:./lib/*:./bin ${MAIN} ${ACTION} > logs/manager.out 2>logs/manager.err -EOF -chmod 775 ${DCM_DIR}/start-manager.sh +nexus3.onap.org:10003/snapshots/onap/org.onap.dcaegen2.collectors.ves.vescollector 1.1 ``` -3. Obtain the required packages to be included in docker -``` -cat <<'EOF' > ${WORKSPACE}/target/stage/Dockerfile -FROM ubuntu:14.04 -MAINTAINER dcae@lists.openecomp.org -WORKDIR /opt/app/manager -ENV HOME /opt/app/SEC -ENV JAVA_HOME /usr -RUN apt-get update && apt-get install -y \ - bc \ - curl \ - telnet \ - vim \ - netcat \ - openjdk-7-jdk -COPY opt /opt -EXPOSE 9999 -CMD [ "/opt/app/manager/start-manager.sh" ] -EOF -``` -4. Extract VES collector jar and copy required directory into image build directory + +### Deployment + +VESCollector in DCAE will be deployed as mS via DCAEGEN2 controller. A blueprint will be generated (CLAMP/SDC) which will fetch the docker image and install on the dockerhost identified. VESCollector on startup will query the configbindingService for updated configuration and starts the service. When configuration change is detected by DCAEGEN2 controller (via policy flow) - then contoller will notify Collector to fetch new configuration again. + +For testing purpose, the docker image includes preset configuration which can be ran without DCAEGEN2 platform. + + +### Testing + +For R1 as only measurement and faults are expected in ONAP, configuration are preset currently sto support these two topics only. + ``` -AR=${WORKSPACE}/target/OpenVESCollector-0.0.1-SNAPSHOT-bundle.tar.gz -STAGE=${WORKSPACE}/target/stage -APP_DIR=${STAGE}/opt/app/SEC -[ -d ${STAGE}/opt/app/OpenVESCollector-0.0.1-SNAPSHOT ] && rm -rf ${STAGE}/opt/app/OpenVESCollector-0.0.1-SNAPSHOT -[ ! -f $APP_DIR ] && mkdir -p ${APP_DIR} -gunzip -c ${AR} | tar xvf - -C ${APP_DIR} --strip-components=1 -# lji: removal of ^M in the VES startup script -sed -i 's/\r$//g' ${APP_DIR}/bin/SErestfulCollector.sh -#find ${APP_DIR} -name "*.sh" -print0 |xargs -0 sed -i 's/\r$//g' +STEPS FOR SETUP/TEST +1) Get the VESCollector image from Nexus + docker pull nexus.onap.org:10001/onap/org.onap.dcaegen2.collectors.ves.vescollector:1.1 +2) Start the container (change the DMAAPHOST environment value to running DMAAP instance host) + docker run -d -p 8080:8080/tcp -p 8443:8443/tcp -P -e DMAAPHOST='10.0.0.174' nexus.onap.org:10001/onap/org.onap.dcaegen2.collectors.ves.vescollector:1.1 +3) Login into container and tail /opt/app/VESCollector/logs/collector.log +4) Simulate event into VEScollector (can be done from different vm or same) + curl -i -X POST -d @measurement.txt --header "Content-Type: application/json" https://localhost:8443/eventListener/v5 -k + or curl -i -X POST -d @measurement.txt --header "Content-Type: application/json" http://localhost:8080/eventListener/v5 -k + Note: If DMAAPHOST provided is invalid, you will see exception around publish on the collector.logs (collector queues and attempts to resend the event hence exceptions reported will be periodic). If you don’t want to see the error, publish to dmaap can be disabled by changing either “collector.dmaap.streamid” on etc/collector.properties OR by modifying the “name” defined on etc/DmaapConfig.json. + + Any changes to property within container requires collector restart + /opt/app/VESCollector/bin/VESrestfulCollector.sh stop + /opt/app/VESCollector/bin/VESrestfulCollector.sh start + +5) If DMAAP instance (and DMAAPHOST passed during VESCollector startup) and VES input is valid, then events will be pushed to below topics depending on the domain + Fault :http://<dmaaphost>:3904/events/unauthenticated.SEC_FAULT_OUTPUT + Measurement : http://<dmaaphost>:3904/events/unauthenticated.SEC_MEASUREMENT_OUTPUT +6) When test is done – do ensure to remove the container (docker rm -f <containerid>) to avoid port conflict ``` -#### Create the Docker image and push package to the OpenECOMP Nexus distribution server + +Authentication is disabled on the container for R1; below are the steps for enabling HTTPS/authentication for VESCollector. ``` -# -# build the docker image. tag and then push to the remote repo -# -IMAGE="dcae-controller-common-event" -TAG="latest" -LFQI="${IMAGE}:${TAG}" -REPO="ecomp-nexus:51212" -RFQI="${REPO}/${LFQI}" -BUILD_PATH="${WORKSPACE}/target/stage" -# build a docker image -docker build --rm -t ${LFQI} ${BUILD_PATH} -# tag -docker tag ${LFQI} ${RFQI} -# push to remote repo -docker push ${RFQI} +1) Login to the container +2) Open /opt/app/VESCollector/etc/collector.properties and edit below properties + a) Comment below property (with authentication enabled, standard http should be disabled) + collector.service.port=8080 + b) Enable basic-authentication + header.authflag=1 + Note: The actual credentials is stored part of header.authlist parameter. This is list of userid,password (base64encoded) values. Default configuration has below set + sample1,c2FtcGxlMQ==|vdnsagg,dmRuc2FnZw==, where password maps to same value as username. +3) Restart the collector + cd /opt/app/VESCollector/bin + ./VESrestfulCollector.sh stop + ./VESrestfulCollector.sh start +4) Exit from container and ensure tcp port on VM is not hanging on finwait – you can execute “netstat -an | grep 8443” . If under FIN_WAIT2, wait for server to release. +5) Simulate via curl (Note - username/pwd will be required) + Example of successfull POST: + vv770d@osdcae-dev-16:~$ curl -i -u 'sample1:sample1' -X POST -d @volte.txt --header "Content-Type: application/json" https://localhost:8443/eventListener/v5 -k + HTTP/1.1 200 OK + Server: Apache-Coyote/1.1 + X-Rathravane: ~ software is craft ~ + Content-Type: application/json;charset=ISO-8859-1 + Content-Length: 17 + Date: Thu, 21 Sep 2017 22:23:49 GMT + Message Accepted + + Example of authentication failure: + vv770d@osdcae-dev-16:~$ curl -i -X POST -d @volte.txt --header "Content-Type: application/json" https://localhost:8443/eventListener/v5 -k + HTTP/1.1 401 Unauthorized + Server: Apache-Coyote/1.1 + X-Rathravane: ~ software is craft ~ + Content-Type: application/json;charset=ISO-8859-1 + Content-Length: 96 + Date: Thu, 21 Sep 2017 22:20:43 GMT + Connection: close + {"requestError":{"GeneralException":{"MessagID":"\"POL2000\"","text":"\"Unauthorized user\""}}} + +Note: In general support for HTTPS also require certificate/keystore be installed on target VM with FS mapped into the container for VESCollector to load. For demo and testing purpose - a self signed certificate is included within docker build. When deployed via DCAEGEN2 platform - these configuration will be overridden dynamically to map to required path/certificate name. This will be exercised post R1 though. ``` diff --git a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java index cd23de87..b974ed53 100644 --- a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java +++ b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java @@ -213,7 +213,7 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable { fTomcatServer.start(); } catch (LifecycleException | IOException e) { - LOG.error("lifecycle or IO: ", e); + log.error("lifecycle or IO: ", e); } fTomcatServer.await(); } @@ -246,7 +246,7 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable { } } - LOG.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); + log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); CommonStartup.metriclog.info("EVENT_PUBLISH_END"); // ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS); @@ -269,20 +269,20 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable { try { // System.out.println("Applying schema: @<@<"+jsonSchema+">@>@ to // data: #<#<"+jsonData+">#>#"); - LOG.trace("Schema validation for event:" + jsonData); + log.trace("Schema validation for event:" + jsonData); JsonNode schemaNode = JsonLoader.fromString(jsonSchema); JsonNode data = JsonLoader.fromString(jsonData); JsonSchemaFactory factory = JsonSchemaFactory.byDefault(); JsonSchema schema = factory.getJsonSchema(schemaNode); report = schema.validate(data); } catch (JsonParseException e) { - LOG.error("schemavalidate:JsonParseException for event:" + jsonData); + log.error("schemavalidate:JsonParseException for event:" + jsonData); return e.getMessage().toString(); } catch (ProcessingException e) { - LOG.error("schemavalidate:Processing exception for event:" + jsonData); + log.error("schemavalidate:Processing exception for event:" + jsonData); return e.getMessage().toString(); } catch (IOException e) { - LOG.error( + log.error( "schemavalidate:IO exception; something went wrong trying to read json data for event:" + jsonData); return e.getMessage().toString(); } @@ -290,21 +290,21 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable { Iterator<ProcessingMessage> iter = report.iterator(); while (iter.hasNext()) { ProcessingMessage pm = iter.next(); - LOG.trace("Processing Message: " + pm.getMessage()); + log.trace("Processing Message: " + pm.getMessage()); } result = String.valueOf(report.isSuccess()); } try { - LOG.debug("Validation Result:" + result + " Validation report:" + report); + log.debug("Validation Result:" + result + " Validation report:" + report); } catch (NullPointerException e) { - LOG.error("schemavalidate:NullpointerException on report"); + log.error("schemavalidate:NullpointerException on report"); } return result; } public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue; private static ApiServer fTomcatServer = null; - private static final Logger LOG = LoggerFactory.getLogger(CommonStartup.class); + private static final Logger log = LoggerFactory.getLogger(CommonStartup.class); } diff --git a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java index 9a056226..51158aa7 100644 --- a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java +++ b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java @@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory; public class ConfigProcessors { - private static Logger LOG = LoggerFactory.getLogger(ConfigProcessors.class); + private static Logger log = LoggerFactory.getLogger(ConfigProcessors.class); private static final String FIELD = "field"; private static final String OLD_FIELD = "oldField"; private static final String FILTER = "filter"; @@ -59,7 +59,7 @@ public class ConfigProcessors { getEventObjectVal(field); } else - LOG.info("Filter not met"); + log.info("Filter not met"); } /** @@ -78,7 +78,7 @@ public class ConfigProcessors { setEventObjectVal(field, value); } else - LOG.info("Filter not met"); + log.info("Filter not met"); } /** @@ -112,7 +112,7 @@ public class ConfigProcessors { setEventObjectVal("suppressEvent", "true"); } else - LOG.info("Filter not met"); + log.info("Filter not met"); } /** @@ -133,7 +133,7 @@ public class ConfigProcessors { setEventObjectVal(field, value, fieldType); } else - LOG.info("Filter not met"); + log.info("Filter not met"); //log.info("addAttribute End"); } @@ -153,7 +153,7 @@ public class ConfigProcessors { setEventObjectVal(field, value); } else - LOG.info("Filter not met"); + log.info("Filter not met"); } /** @@ -170,7 +170,7 @@ public class ConfigProcessors { removeEventKey(field); } else - LOG.info("Filter not met"); + log.info("Filter not met"); } /** @@ -178,7 +178,7 @@ public class ConfigProcessors { */ public void renameArrayInArray(JSONObject J) //map { - LOG.info("renameArrayInArray"); + log.info("renameArrayInArray"); final String field = J.getString(FIELD); final String oldField = J.getString(OLD_FIELD); final JSONObject filter = J.optJSONObject(FILTER); @@ -200,15 +200,15 @@ public class ConfigProcessors { final String value = oldValue.replaceAll(oldArrayName, newArrayName); //log.info("oldArrayName ==" + oldArrayName); //log.info("newArrayName ==" + newArrayName); - LOG.info("oldValue ==" + oldValue); - LOG.info("value ==" + value); + log.info("oldValue ==" + oldValue); + log.info("value ==" + value); JSONArray ja = new JSONArray(value); removeEventKey(oldfsplit[0]); setEventObjectVal(fsplit[0], ja); } } else - LOG.info("Filter not met"); + log.info("Filter not met"); } /** @@ -234,7 +234,7 @@ public class ConfigProcessors { */ public String performOperation(String operation, String value) { - LOG.info("performOperation"); + log.info("performOperation"); if (operation != null) { if (operation.equals("convertMBtoKB")) @@ -274,7 +274,7 @@ public class ConfigProcessors { } } else - LOG.info("Filter not met"); + log.info("Filter not met"); } /** @@ -282,7 +282,7 @@ public class ConfigProcessors { */ public void mapToJArray(JSONObject J) { - LOG.info("mapToJArray"); + log.info("mapToJArray"); String field = J.getString(FIELD); String oldField = J.getString(OLD_FIELD); final JSONObject filter = J.optJSONObject(FILTER); @@ -297,7 +297,7 @@ public class ConfigProcessors { String value = getEventObjectVal(oldField).toString(); if (!value.equals(OBJECT_NOT_FOUND)) { - LOG.info("old value ==" + value.toString()); + log.info("old value ==" + value.toString()); //update old value based on attrMap if (attrMap != null) { @@ -309,7 +309,7 @@ public class ConfigProcessors { } } - LOG.info("new value ==" + value); + log.info("new value ==" + value); char c = value.charAt(0); if (c != '[') { @@ -344,7 +344,7 @@ public class ConfigProcessors { } } else - LOG.info("Filter not met"); + log.info("Filter not met"); } /** @@ -385,7 +385,7 @@ public class ConfigProcessors { setEventObjectVal(field, value); } else - LOG.info("Filter not met"); + log.info("Filter not met"); } public void subtractValue(JSONObject J) @@ -399,9 +399,9 @@ public class ConfigProcessors { float value = 0; for (int i=0; i < values.length(); i++) { - LOG.info(values.getString(i)); + log.info(values.getString(i)); String tempVal = evaluate(values.getString(i)); - LOG.info("tempVal==" + tempVal); + log.info("tempVal==" + tempVal); if (!tempVal.equals(OBJECT_NOT_FOUND)) { if (i ==0) @@ -410,11 +410,11 @@ public class ConfigProcessors { value = value - Float.valueOf(tempVal); } } - LOG.info("value ==" + value ); + log.info("value ==" + value ); setEventObjectVal(field, value, "number"); } else - LOG.info("Filter not met"); + log.info("Filter not met"); } /** @@ -455,7 +455,7 @@ public class ConfigProcessors { //log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "split1==" + splitVal[1]); if (getEventObjectVal(key).toString().matches(splitVal[1])) { - LOG.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); return false; } } @@ -463,7 +463,7 @@ public class ConfigProcessors { { if (!(getEventObjectVal(key).toString().matches(splitVal[1]))) { - LOG.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); return false; } } @@ -477,7 +477,7 @@ public class ConfigProcessors { //log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "split1==" + splitVal[1]); if (getEventObjectVal(key).toString().contains(splitVal[1])) { - LOG.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); return false; } } @@ -485,7 +485,7 @@ public class ConfigProcessors { { if (!(getEventObjectVal(key).toString().contains(splitVal[1]))) { - LOG.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); return false; } } @@ -498,7 +498,7 @@ public class ConfigProcessors { { if(getEventObjectVal(key).toString().equals(filterValue)) { - LOG.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); return false; } } @@ -506,7 +506,7 @@ public class ConfigProcessors { { if(!(getEventObjectVal(key).toString().equals(filterValue))) { - LOG.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false"); return false; } } @@ -570,7 +570,7 @@ public class ConfigProcessors { if (keySeriesObj instanceof String) { //keySeriesObj = keySeriesObj.get(keySet[i]); - LOG.info("STRING==" + keySeriesObj); + log.info("STRING==" + keySeriesObj); } else if (keySeriesObj instanceof JSONArray) { keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])); @@ -582,7 +582,7 @@ public class ConfigProcessors { } else { - LOG.info("unknown object==" + keySeriesObj); + log.info("unknown object==" + keySeriesObj); } } } @@ -608,7 +608,7 @@ public class ConfigProcessors { { keySeriesStr = keySeriesStr.replaceAll("\\.\\.", "."); } - LOG.info("fieldType==" + fieldType); + log.info("fieldType==" + fieldType); //log.info(Integer.toString(keySeriesStr.lastIndexOf("."))); //log.info(Integer.toString(keySeriesStr.length() -1)); if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() -1 ) @@ -622,7 +622,7 @@ public class ConfigProcessors { //keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])); if (((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])) == null) //if the object is not there then add it { - LOG.info("Object is null, must add it"); + log.info("Object is null, must add it"); if (keySet[i+1].matches("[0-9]*")) // if index then array ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONArray()); else @@ -638,14 +638,14 @@ public class ConfigProcessors { ((JSONObject) keySeriesObj).put(keySet[i], new JSONArray()); else ((JSONObject) keySeriesObj).put(keySet[i], new JSONObject()); - LOG.info("Object is null, must add it"); + log.info("Object is null, must add it"); } keySeriesObj = ( (JSONObject) keySeriesObj).opt(keySet[i]); //log.info("JSONObject==" + keySeriesObj); } else { - LOG.info("unknown object==" + keySeriesObj); + log.info("unknown object==" + keySeriesObj); } } if (fieldType.equals("number") ) |