diff options
-rw-r--r-- | etc/DmaapConfig.json | 48 | ||||
-rwxr-xr-x | etc/collector.properties | 148 | ||||
-rw-r--r-- | etc/log4j.xml | 6 | ||||
-rw-r--r-- | src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java | 552 | ||||
-rw-r--r-- | src/test/java/org/onap/dcae/vestest/TestEventReceipt.java | 123 |
5 files changed, 506 insertions, 371 deletions
diff --git a/etc/DmaapConfig.json b/etc/DmaapConfig.json index fd38d093..f144b382 100644 --- a/etc/DmaapConfig.json +++ b/etc/DmaapConfig.json @@ -1,20 +1,28 @@ -{ - "channels": [ - { - "name": "ves_measurement", - "cambria.topic": "unauthenticated.SEC_MEASUREMENT_OUTPUT", - "class": "HpCambriaOutputStream", - "stripHpId": "true", - "type": "out", - "cambria.hosts": "onap.dmaap.org" - }, - { - "name": "ves_fault", - "cambria.topic": "unauthenticated.SEC_FAULT_OUTPUT", - "class": "HpCambriaOutputStream", - "stripHpId": "true", - "type": "out", - "cambria.hosts": "onap.dmaap.org" - } - ] -} +{
+ "channels": [
+ {
+ "name": "ves_measurement",
+ "cambria.topic": "unauthenticated.SEC_MEASUREMENT_OUTPUT",
+ "class": "HpCambriaOutputStream",
+ "stripHpId": "true",
+ "type": "out",
+ "cambria.hosts": "onap.dmaap.org"
+ },
+ {
+ "name": "ves_fault",
+ "cambria.topic": "unauthenticated.SEC_FAULT_OUTPUT",
+ "class": "HpCambriaOutputStream",
+ "stripHpId": "true",
+ "type": "out",
+ "cambria.hosts": "onap.dmaap.org"
+ },
+ {
+ "name": "ves_heartbeat",
+ "cambria.topic": "unauthenticated.SEC_FAULT_OUTPUT",
+ "class": "HpCambriaOutputStream",
+ "stripHpId": "true",
+ "type": "out",
+ "cambria.hosts": "onap.dmaap.org"
+ }
+ ]
+}
diff --git a/etc/collector.properties b/etc/collector.properties index 26c12936..251cb02a 100755 --- a/etc/collector.properties +++ b/etc/collector.properties @@ -1,74 +1,74 @@ -############################################################################### -## -## Collector Server config -## -## - Default values are shown as commented settings. -## -############################################################################### -## -## HTTP(S) service -## -## Normally: -## -## - 8080 is http service -## - https is disabled by default (-1) -## -## - At this time, the server always binds to 0.0.0.0 -## -## The default port when header.authflag is disabled (0) -collector.service.port=8080 - -## The secure port is required if header.authflag is set to 1 (true) -## Authentication is only supported via secure port -## When enabled - require valid keystore defined -collector.service.secure.port=8443 - -## The keystore must be setup per installation when secure port is configured -collector.keystore.file.location=../etc/keystore -collector.keystore.passwordfile=./etc/passwordfile -collector.keystore.alias=tomcat - - -############################################################################### -## Processing -## -## If there's a problem that prevents the collector from processing alarms, -## it's normally better to apply back pressure to the caller than to try to -## buffer beyond a reasonable size limit. With a limit, the server won't crash -## due to being out of memory, and the caller will get a 5xx reply saying the -## server is in trouble. -collector.inputQueue.maxPending=8096 - -## Schema Validation checkflag -## default no validation checkflag (-1) -## If enabled (1) - schemafile location must be specified -collector.schema.checkflag=1 -collector.schema.file={\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.1.json\"} - -## List all streamid per domain to be supported. The streamid should match to channel name on dmaapfile -collector.dmaap.streamid=fault=ves_fault,ves_fault_secondary|syslog=ves_syslog,ves_syslog_secondary|heartbeat=ves_heartbeat,ves_heartbeat_secondary|measurementsForVfScaling=ves_measurement,ves_measurement_secondary|mobileFlow=ves_mobileflow,ves_mobileflow_secondary|other=ves_other,ves_other_secondary|stateChange=ves_statechange,ves_statechange_secondary|thresholdCrossingAlert=ves_thresholdCrossingAlert,ves_thresholdCrossingAlert_secondary|voiceQuality=ves_voicequality,ves_voicequality_secondary|sipSignaling=ves_sipsignaling,ves_sipsignaling_secondary -collector.dmaapfile=./etc/DmaapConfig.json - -## Custom ExceptionConfiguration -exceptionConfig=./etc/ExceptionConfig.json - -## authflag control authentication by the collector -## If enabled (1) - then authlist has to be defined -## When authflag is enabled, only secure port will be supported -## To disable enter 0 -header.authflag=1 -## Combination of userid,base64 encoded pwd list to be supported -## userid and pwd comma separated; pipe delimitation between each pair -header.authlist=sample1,c2FtcGxlMQ== - -## Event transformation Flag - when set expects configurable transformation -## defined under ./etc/eventTransform.json -## Enabled by default; to disable set to 0 -event.transform.flag=1 - -############################################################################### -## -## Tomcat control -## -#tomcat.maxthreads=(tomcat default, which is usually 200) - +###############################################################################
+##
+## Collector Server config
+##
+## - Default values are shown as commented settings.
+##
+###############################################################################
+##
+## HTTP(S) service
+##
+## Normally:
+##
+## - 8080 is http service
+## - https is disabled by default (-1)
+##
+## - At this time, the server always binds to 0.0.0.0
+##
+## The default port when header.authflag is disabled (0)
+collector.service.port=8080
+
+## The secure port is required if header.authflag is set to 1 (true)
+## Authentication is only supported via secure port
+## When enabled - require valid keystore defined
+#ccollector.service.secure.port=8443
+
+## The keystore must be setup per installation when secure port is configured
+collector.keystore.file.location=../etc/keystore
+collector.keystore.passwordfile=./etc/passwordfile
+collector.keystore.alias=tomcat
+
+
+###############################################################################
+## Processing
+##
+## If there's a problem that prevents the collector from processing alarms,
+## it's normally better to apply back pressure to the caller than to try to
+## buffer beyond a reasonable size limit. With a limit, the server won't crash
+## due to being out of memory, and the caller will get a 5xx reply saying the
+## server is in trouble.
+collector.inputQueue.maxPending=8096
+
+## Schema Validation checkflag
+## default no validation checkflag (-1)
+## If enabled (1) - schemafile location must be specified
+collector.schema.checkflag=1
+collector.schema.file={\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.1.json\"}
+
+## List all streamid per domain to be supported. The streamid should match to channel name on dmaapfile
+collector.dmaap.streamid=fault=ves_fault,ves_fault_secondary|syslog=ves_syslog,ves_syslog_secondary|heartbeat=ves_heartbeat,ves_heartbeat_secondary|measurementsForVfScaling=ves_measurement,ves_measurement_secondary|mobileFlow=ves_mobileflow,ves_mobileflow_secondary|other=ves_other,ves_other_secondary|stateChange=ves_statechange,ves_statechange_secondary|thresholdCrossingAlert=ves_thresholdCrossingAlert,ves_thresholdCrossingAlert_secondary|voiceQuality=ves_voicequality,ves_voicequality_secondary|sipSignaling=ves_sipsignaling,ves_sipsignaling_secondary
+collector.dmaapfile=./etc/DmaapConfig.json
+
+## Custom ExceptionConfiguration
+exceptionConfig=./etc/ExceptionConfig.json
+
+## authflag control authentication by the collector
+## If enabled (1) - then authlist has to be defined
+## When authflag is enabled, only secure port will be supported
+## To disable enter 0
+header.authflag=0
+## Combination of userid,base64 encoded pwd list to be supported
+## userid and pwd comma separated; pipe delimitation between each pair
+header.authlist=sample1,c2FtcGxlMQ==
+
+## Event transformation Flag - when set expects configurable transformation
+## defined under ./etc/eventTransform.json
+## Enabled by default; to disable set to 0
+event.transform.flag=1
+
+###############################################################################
+##
+## Tomcat control
+##
+#tomcat.maxthreads=(tomcat default, which is usually 200)
+
diff --git a/etc/log4j.xml b/etc/log4j.xml index dc5fe100..baf03559 100644 --- a/etc/log4j.xml +++ b/etc/log4j.xml @@ -1,3 +1,6 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + <!-- ================================================================================ Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. @@ -17,8 +20,7 @@ limitations under the License. ECOMP is a trademark and service mark of AT&T Intellectual Property. --> -<?xml version="1.0" encoding="UTF-8"?> -<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false"> diff --git a/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java b/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java index 8d2ecaa1..f45f60c3 100644 --- a/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java +++ b/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java @@ -1,275 +1,277 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.restapi.endpoints; - -import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint; -import com.att.nsa.clock.SaClock; -import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext; -import com.att.nsa.drumlin.service.standards.HttpStatusCodes; -import com.att.nsa.drumlin.service.standards.MimeTypes; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; -import com.att.nsa.security.db.simple.NsaSimpleApiKey; -import com.google.gson.JsonParser; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; -import org.json.JSONTokener; -import org.onap.dcae.commonFunction.CommonStartup; -import org.onap.dcae.commonFunction.CommonStartup.QueueFullException; -import org.onap.dcae.commonFunction.CustomExceptionLoader; -import org.onap.dcae.commonFunction.VESLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileReader; -import java.io.IOException; -import java.io.InputStream; -import java.util.UUID; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class EventReceipt extends NsaBaseEndpoint { - - private static final Logger log = LoggerFactory.getLogger(EventReceipt.class); - private static final String MESSAGE = " Message:"; - static String valresult; - static JSONObject customerror; - - public static void receiveVESEvent(DrumlinRequestContext ctx) { - // the request body carries events. assume for now it's an array - // of json objects that fits in memory. (See cambria's parsing for - // handling large messages) - - NsaSimpleApiKey retkey = null; - - JSONArray jsonArray; - JSONArray jsonArrayMod = new JSONArray(); - JSONObject event; - JSONObject jsonObject; - FileReader fr = null; - InputStream istr = null; - int arrayFlag = 0; - String vesVersion = null; - - try { - // System.out.print("Version string:" + version); - - // String br = new BufferedReader(new - // InputStreamReader(ctx.request().getBodyStream())).readLine(); - // JsonElement msg = new JsonParser().parse(new BufferedReader(new - // InputStreamReader(ctx.request().getBodyStream())).readLine()); - // jsonArray = new JSONArray ( new JSONTokener ( - // ctx.request().getBodyStream () ) ); - - log.debug("Request recieved :" + ctx.request().getRemoteAddress()); - istr = ctx.request().getBodyStream(); - jsonObject = new JSONObject(new JSONTokener(istr)); - - log.info("ctx getPathInContext: " + ctx.request().getPathInContext()); - Pattern p = Pattern.compile("(v\\d+)"); - Matcher m = p.matcher(ctx.request().getPathInContext()); - - if (m.find()) { - log.info("VES version:" + m.group()); - vesVersion = m.group(); - m = null; - p = null; - - } - - final UUID uuid = UUID.randomUUID(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - - if (ctx.request().getPathInContext().contains("eventBatch")) { - CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid - + " VES Batch Input Messsage: " + jsonObject); - log.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid + " VES Batch Input Messsage: " - + jsonObject); - arrayFlag = 1; - } else { - CommonStartup.inlog.info( - ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject); - log.info(ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject); - - } - - try { - if (CommonStartup.authflag == 1) { - retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx); - } - } catch (NullPointerException x) { - log.info("Invalid user request " + ctx.request().getContentType() + MESSAGE + jsonObject); - CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + x); - respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Invalid user"); - return; - } - - if (retkey != null || CommonStartup.authflag == 0) { - if (CommonStartup.schemaValidatorflag > 0) { - if ((arrayFlag == 1) && (jsonObject.has("eventList") && (!jsonObject.has("event"))) - || ((arrayFlag == 0) && (!jsonObject.has("eventList") && (jsonObject.has("event"))))) { - fr = new FileReader(schemaFileVersion(vesVersion)); - String schema = new JsonParser().parse(fr).toString(); - - valresult = CommonStartup.schemavalidate(jsonObject.toString(), schema); - if (valresult.equals("true")) { - log.info("Validation successful"); - } else if (valresult.equals("false")) { - log.info("Validation failed"); - respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, - "Schema validation failed"); - return; - } else { - log.error("Validation errored" + valresult); - respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, - "Couldn't parse JSON object"); - return; - } - } else { - log.info("Validation failed"); - respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed"); - return; - } - if (arrayFlag == 1) { - jsonArray = jsonObject.getJSONArray("eventList"); - log.info("Validation successful for all events in batch"); - for (int i = 0; i < jsonArray.length(); i++) { - event = new JSONObject().put("event", jsonArray.getJSONObject(i)); - event.put("VESuniqueId", uuid + "-" + i); - event.put("VESversion", vesVersion); - jsonArrayMod.put(event); - } - log.info("Modified jsonarray:" + jsonArrayMod.toString()); - } else { - jsonObject.put("VESuniqueId", uuid); - jsonObject.put("VESversion", vesVersion); - jsonArrayMod = new JSONArray().put(jsonObject); - } - } - - // reject anything that's not JSON - if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) { - log.info(String.format("Rejecting request with content type %s Message:%s", - ctx.request().getContentType(), jsonObject)); - respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, - "Incorrect message content-type; only accepts application/json messages"); - return; - } - - CommonStartup.handleEvents(jsonArrayMod); - } else { - log.info(String.format("Unauthorized request %s%s%s", ctx.request().getContentType(), MESSAGE, - jsonObject)); - respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Unauthorized user"); - return; - } - } catch (JSONException | NullPointerException | IOException x) { - log.error(String.format("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest%d%s%s", - HttpStatusCodes.k400_badRequest, MESSAGE, x.getMessage())); - CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x); - respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object"); - return; - } catch (QueueFullException e) { - log.error("Collector internal queue full :" + e.getMessage(), e); - CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e); - respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full"); - return; - } finally { - if (fr != null) { - safeClose(fr); - } - - if (istr != null) { - safeClose(istr); - } - } - log.info("MessageAccepted and k200_ok to be sent"); - ctx.response().sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson); - } - - public static void respondWithCustomMsginJson(DrumlinRequestContext ctx, int sc, String msg) { - String[] str; - String exceptionType = "GeneralException"; - - str = CustomExceptionLoader.LookupMap(String.valueOf(sc), msg); - log.info("Post CustomExceptionLoader.LookupMap" + str); - - if (str != null) { - - if (str[0].matches("SVC")) { - exceptionType = "ServiceException"; - } else if (str[1].matches("POL")) { - exceptionType = "PolicyException"; - } - - JSONObject jb = new JSONObject().put("requestError", - new JSONObject().put(exceptionType, new JSONObject().put("MessagID", str[0]).put("text", str[1]))); - - log.debug("Constructed json error : " + jb); - ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson); - } else { - JSONObject jb = new JSONObject().put("requestError", - new JSONObject().put(exceptionType, new JSONObject().put("Status", sc).put("Error", msg))); - ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson); - } - - } - - public static void safeClose(FileReader fr) { - if (fr != null) { - try { - fr.close(); - } catch (IOException e) { - log.error("Error closing file reader stream : " + e); - } - } - - } - - public static void safeClose(InputStream is) { - if (is != null) { - try { - is.close(); - } catch (IOException e) { - log.error("Error closing Input stream : " + e); - } - } - - } - - public static String schemaFileVersion(String version) { - String filename = null; - - if (CommonStartup.schemaFileJson.has(version)) { - filename = CommonStartup.schemaFileJson.getString(version); - } else { - filename = CommonStartup.schemaFileJson.getString("v5"); - - } - log.info(String.format("VESversion: %s Schema File:%s", version, filename)); - return filename; - - } - -} - +/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.restapi.endpoints;
+
+import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext;
+import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
+import com.att.nsa.drumlin.service.standards.MimeTypes;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+import com.google.gson.JsonParser;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.onap.dcae.commonFunction.CommonStartup;
+import org.onap.dcae.commonFunction.CommonStartup.QueueFullException;
+import org.onap.dcae.commonFunction.CustomExceptionLoader;
+import org.onap.dcae.commonFunction.VESLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class EventReceipt extends NsaBaseEndpoint {
+
+ private static final Logger log = LoggerFactory.getLogger(EventReceipt.class);
+ private static final String MESSAGE = " Message:";
+ static String valresult;
+ static JSONObject customerror;
+
+ public static void receiveVESEvent(DrumlinRequestContext ctx) {
+ // the request body carries events. assume for now it's an array
+ // of json objects that fits in memory. (See cambria's parsing for
+ // handling large messages)
+
+ NsaSimpleApiKey retkey = null;
+
+
+ JSONObject jsonObject;
+ FileReader fr = null;
+ InputStream istr = null;
+ int arrayFlag = 0;
+ String vesVersion = null;
+
+ try {
+
+
+ log.debug("Request recieved :" + ctx.request().getRemoteAddress());
+ istr = ctx.request().getBodyStream();
+ jsonObject = new JSONObject(new JSONTokener(istr));
+
+ log.info("ctx getPathInContext: " + ctx.request().getPathInContext());
+ Pattern p = Pattern.compile("(v\\d+)");
+ Matcher m = p.matcher(ctx.request().getPathInContext());
+
+ if (m.find()) {
+ log.info("VES version:" + m.group());
+ vesVersion = m.group();
+ m = null;
+ p = null;
+
+ }
+
+ final UUID uuid = UUID.randomUUID();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+
+ if (ctx.request().getPathInContext().contains("eventBatch")) {
+ CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid
+ + " VES Batch Input Messsage: " + jsonObject);
+ log.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid + " VES Batch Input Messsage: "
+ + jsonObject);
+ arrayFlag = 1;
+ } else {
+ CommonStartup.inlog.info(
+ ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);
+ log.info(ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);
+
+ }
+
+ try {
+ if (CommonStartup.authflag == 1) {
+ retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx);
+ }
+ } catch (NullPointerException x) {
+ log.info("Invalid user request " + ctx.request().getContentType() + MESSAGE + jsonObject);
+ CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + x);
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Invalid user");
+ return;
+ }
+
+ schemaCheck( retkey, arrayFlag, jsonObject, vesVersion, ctx, uuid);
+
+ } catch (JSONException | NullPointerException | IOException x) {
+ log.error(String.format("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest%d%s%s",
+ HttpStatusCodes.k400_badRequest, MESSAGE, x.getMessage()));
+ CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x);
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object");
+ return;
+ } catch (QueueFullException e) {
+ log.error("Collector internal queue full :" + e.getMessage(), e);
+ CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e);
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full");
+ return;
+ } finally {
+ if (fr != null) {
+ safeClose(fr);
+ }
+
+ if (istr != null) {
+ safeClose(istr);
+ }
+ }
+ log.info("MessageAccepted and k200_ok to be sent");
+ ctx.response().sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson);
+ }
+
+ public static void schemaCheck(NsaSimpleApiKey retkey, int arrayFlag,JSONObject jsonObject, String vesVersion, DrumlinRequestContext ctx, UUID uuid) throws JSONException, QueueFullException, IOException
+ {
+ JSONArray jsonArray;
+ JSONArray jsonArrayMod = new JSONArray();
+ JSONObject event;
+ FileReader fr;
+ if (retkey != null || CommonStartup.authflag == 0) {
+ if (CommonStartup.schemaValidatorflag > 0) {
+ if ((arrayFlag == 1) && (jsonObject.has("eventList") && (!jsonObject.has("event")))
+ || ((arrayFlag == 0) && (!jsonObject.has("eventList") && (jsonObject.has("event"))))) {
+ fr = new FileReader(schemaFileVersion(vesVersion));
+ String schema = new JsonParser().parse(fr).toString();
+
+ valresult = CommonStartup.schemavalidate(jsonObject.toString(), schema);
+ if (valresult.equals("true")) {
+ log.info("Validation successful");
+ } else if (valresult.equals("false")) {
+ log.info("Validation failed");
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
+ "Schema validation failed");
+ return;
+ } else {
+ log.error("Validation errored" + valresult);
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
+ "Couldn't parse JSON object");
+ return;
+ }
+ } else {
+ log.info("Validation failed");
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed");
+ return;
+ }
+ if (arrayFlag == 1) {
+ jsonArray = jsonObject.getJSONArray("eventList");
+ log.info("Validation successful for all events in batch");
+ for (int i = 0; i < jsonArray.length(); i++) {
+ event = new JSONObject().put("event", jsonArray.getJSONObject(i));
+ event.put("VESuniqueId", uuid + "-" + i);
+ event.put("VESversion", vesVersion);
+ jsonArrayMod.put(event);
+ }
+ log.info("Modified jsonarray:" + jsonArrayMod.toString());
+ } else {
+ jsonObject.put("VESuniqueId", uuid);
+ jsonObject.put("VESversion", vesVersion);
+ jsonArrayMod = new JSONArray().put(jsonObject);
+ }
+ }
+
+ // reject anything that's not JSON
+ if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) {
+ log.info(String.format("Rejecting request with content type %s Message:%s",
+ ctx.request().getContentType(), jsonObject));
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
+ "Incorrect message content-type; only accepts application/json messages");
+ return;
+ }
+
+ CommonStartup.handleEvents(jsonArrayMod);
+ } else {
+ log.info(String.format("Unauthorized request %s%s%s", ctx.request().getContentType(), MESSAGE,
+ jsonObject));
+ respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Unauthorized user");
+ return;
+ }
+ }
+
+ public static void respondWithCustomMsginJson(DrumlinRequestContext ctx, int sc, String msg) {
+ String[] str;
+ String exceptionType = "GeneralException";
+
+ str = CustomExceptionLoader.LookupMap(String.valueOf(sc), msg);
+ log.info("Post CustomExceptionLoader.LookupMap" + str);
+
+ if (str != null) {
+
+ if (str[0].matches("SVC")) {
+ exceptionType = "ServiceException";
+ } else if (str[1].matches("POL")) {
+ exceptionType = "PolicyException";
+ }
+
+ JSONObject jb = new JSONObject().put("requestError",
+ new JSONObject().put(exceptionType, new JSONObject().put("MessagID", str[0]).put("text", str[1])));
+
+ log.debug("Constructed json error : " + jb);
+ ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson);
+ } else {
+ JSONObject jb = new JSONObject().put("requestError",
+ new JSONObject().put(exceptionType, new JSONObject().put("Status", sc).put("Error", msg)));
+ ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson);
+ }
+
+ }
+
+ public static void safeClose(FileReader fr) {
+ if (fr != null) {
+ try {
+ fr.close();
+ } catch (IOException e) {
+ log.error("Error closing file reader stream : " + e);
+ }
+ }
+
+ }
+
+ public static void safeClose(InputStream is) {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ log.error("Error closing Input stream : " + e);
+ }
+ }
+
+ }
+
+ public static String schemaFileVersion(String version) {
+ String filename = null;
+
+ if (CommonStartup.schemaFileJson.has(version)) {
+ filename = CommonStartup.schemaFileJson.getString(version);
+ } else {
+ filename = CommonStartup.schemaFileJson.getString("v5");
+
+ }
+ log.info(String.format("VESversion: %s Schema File:%s", version, filename));
+ return filename;
+
+ }
+
+}
+
diff --git a/src/test/java/org/onap/dcae/vestest/TestEventReceipt.java b/src/test/java/org/onap/dcae/vestest/TestEventReceipt.java new file mode 100644 index 00000000..668c718a --- /dev/null +++ b/src/test/java/org/onap/dcae/vestest/TestEventReceipt.java @@ -0,0 +1,123 @@ +/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.vestest;
+
+import static org.junit.Assert.assertEquals;
+
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.dcae.commonFunction.CommonStartup;
+import org.onap.dcae.commonFunction.CommonStartup.QueueFullException;
+import org.onap.dcae.commonFunction.CustomExceptionLoader;
+import org.onap.dcae.restapi.endpoints.EventReceipt;
+
+import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
+import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext;
+import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+
+import jline.internal.Log;
+
+public class TestEventReceipt extends NsaBaseEndpoint {
+
+ DrumlinRequestContext ctx;
+ JSONObject jsonObject;
+ Boolean flag = false;
+ String ev = "{\"event\": {\"commonEventHeader\": { \"reportingEntityName\": \"VM name will be provided by ECOMP\", \"startEpochMicrosec\": 1477012779802988,\"lastEpochMicrosec\": 1477012789802988,\"eventId\": \"83\",\"sourceName\": \"Dummy VM name - No Metadata available\",\"sequence\": 83,\"priority\": \"Normal\",\"functionalRole\": \"vFirewall\",\"domain\": \"measurementsForVfScaling\",\"reportingEntityId\": \"VM UUID will be provided by ECOMP\",\"sourceId\": \"Dummy VM UUID - No Metadata available\",\"version\": 1.1},\"measurementsForVfScalingFields\": {\"measurementInterval\": 10,\"measurementsForVfScalingVersion\": 1.1,\"vNicUsageArray\": [{\"multicastPacketsIn\": 0,\"bytesIn\": 3896,\"unicastPacketsIn\": 0, \"multicastPacketsOut\": 0,\"broadcastPacketsOut\": 0, \"packetsOut\": 28,\"bytesOut\": 12178,\"broadcastPacketsIn\": 0,\"packetsIn\": 58,\"unicastPacketsOut\": 0,\"vNicIdentifier\": \"eth0\"}]}}}";
+
+
+ @Before
+ public void setUp() throws Exception {
+
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testschemaFileVersion() {
+
+ String filename = null;
+ CommonStartup.schemaFileJson = new JSONObject(
+ "{\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.1.json\"}");
+ filename = EventReceipt.schemaFileVersion("v5");
+
+ if (!filename.isEmpty()) {
+ flag = true;
+ }
+ assertEquals(true, flag);
+ }
+
+ @Test
+ public void testrespondWithCustomMsginJson() {
+
+ CommonStartup.exceptionConfig = "./etc/ExceptionConfig.json";
+ CustomExceptionLoader.LoadMap();
+
+ try {
+ EventReceipt.respondWithCustomMsginJson(null, HttpStatusCodes.k401_unauthorized, "Unauthorized user");
+ }
+ catch (Exception e)
+ {
+ //As context object is null, handling null pointer exception.
+ Log.debug("Response object creation failure");
+ }
+ assertEquals(true, true);
+ }
+
+ @Test
+ public void testschemaCheck() {
+
+ // schemaCheck(NsaSimpleApiKey retkey, int arrayFlag,JSONObject
+ // jsonObject, String vesVersion, FileReader fr, DrumlinRequestContext
+ // ctx, UUID uuid) throws JSONException, QueueFullException, IOException
+ NsaSimpleApiKey retkey = null;
+ int arrayFlag = 0;
+
+ CommonStartup.authflag = 0;
+ CommonStartup.schemaValidatorflag = 1;
+
+ jsonObject = new org.json.JSONObject(ev);
+
+ String vesVersion = "v1";
+
+ DrumlinRequestContext ctx = null;
+
+
+ UUID uuid = UUID.randomUUID();
+
+ try {
+ EventReceipt.schemaCheck(retkey, arrayFlag, jsonObject, vesVersion, ctx, uuid);
+ } catch (NullPointerException |JSONException | QueueFullException | IOException e) {
+
+ Log.debug("Response object creation failure");
+ }
+ assertEquals(true, true);
+ }
+}
|