diff options
author | Zlatko Murgoski <zlatko.murgoski@nokia.com> | 2019-05-09 11:21:14 +0200 |
---|---|---|
committer | Zlatko Murgoski <zlatko.murgoski@nokia.com> | 2019-06-14 11:56:34 +0200 |
commit | 3c3c7ad09c02852cd0b4db03ecc9cc5c429cab08 (patch) | |
tree | 80221b2d825d878b6b6e860a5bae55f9317f9ec3 /src/main/java/org/onap/dcae/restapi/VesRestController.java | |
parent | 2bff7994a2bf880694a4c967b488ce55f3911af2 (diff) |
VES Collector - Event Ordering
https://jira.onap.org/browse/DCAEGEN2-1483
Change-Id: I28b0e871ce570a3cf4c0d2e08d040b66eb6db3aa
Issue-ID: DCAEGEN2-1483
Signed-off-by: Zlatko Murgoski <zlatko.murgoski@nokia.com>
Diffstat (limited to 'src/main/java/org/onap/dcae/restapi/VesRestController.java')
-rw-r--r-- | src/main/java/org/onap/dcae/restapi/VesRestController.java | 179 |
1 files changed, 43 insertions, 136 deletions
diff --git a/src/main/java/org/onap/dcae/restapi/VesRestController.java b/src/main/java/org/onap/dcae/restapi/VesRestController.java index 3102c31c..b18eb7bc 100644 --- a/src/main/java/org/onap/dcae/restapi/VesRestController.java +++ b/src/main/java/org/onap/dcae/restapi/VesRestController.java @@ -21,32 +21,27 @@ package org.onap.dcae.restapi; -import static java.util.stream.StreamSupport.stream; import static org.springframework.http.ResponseEntity.accepted; +import static org.springframework.http.ResponseEntity.badRequest; import com.att.nsa.clock.SaClock; import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; -import com.github.fge.jackson.JsonLoader; -import com.github.fge.jsonschema.core.report.ProcessingReport; -import com.github.fge.jsonschema.main.JsonSchema; - +import java.util.Optional; import java.util.UUID; -import java.util.concurrent.LinkedBlockingQueue; import javax.servlet.http.HttpServletRequest; - import org.json.JSONArray; import org.json.JSONObject; -import org.onap.dcae.ApplicationException; import org.onap.dcae.ApplicationSettings; +import org.onap.dcae.common.EventSender; import org.onap.dcae.common.VESLogger; +import org.onap.dcae.common.EventUpdater; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @@ -54,152 +49,64 @@ import org.springframework.web.bind.annotation.RestController; @RestController public class VesRestController { - private static final Logger log = LoggerFactory.getLogger(VesRestController.class); - private static final String INVALID_JSON = ApiException.INVALID_JSON_INPUT.toJSON().toString(); - private final ApplicationSettings applicationSettings; - private final LinkedBlockingQueue<JSONObject> inputQueue; - private final Logger metricsLog; - private final Logger errorLog; - private final Logger incomingRequestsLogger; + private static final String VES_EVENT_MESSAGE = "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'"; + private static final String EVENT_LIST = "eventList"; + private static final String EVENT = "event"; + private final ApplicationSettings settings; + private final Logger requestLogger; + private EventSender eventSender; @Autowired - VesRestController(ApplicationSettings applicationSettings, - @Qualifier("metricsLog") Logger metricsLog, - @Qualifier("errorLog") Logger errorLog, + VesRestController(ApplicationSettings settings, @Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger, - @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) { - this.applicationSettings = applicationSettings; - this.metricsLog = metricsLog; - this.errorLog = errorLog; - this.incomingRequestsLogger = incomingRequestsLogger; - this.inputQueue = inputQueue; - } - - @GetMapping("/") - String mainPage() { - return "Welcome to VESCollector"; + @Qualifier("eventSender") EventSender eventSender) { + this.settings = settings; + this.requestLogger = incomingRequestsLogger; + this.eventSender = eventSender; } - //refactor in next iteration - @PostMapping(value = {"/eventListener/v1", - "/eventListener/v1/eventBatch", - "/eventListener/v2", - "/eventListener/v2/eventBatch", - "/eventListener/v3", - "/eventListener/v3/eventBatch", - "/eventListener/v4", - "/eventListener/v4/eventBatch", - "/eventListener/v5", - "/eventListener/v5/eventBatch", - "/eventListener/v7", - "/eventListener/v7/eventBatch"}, consumes = "application/json") - ResponseEntity<String> receiveEvent(@RequestBody String jsonPayload, HttpServletRequest httpServletRequest) { - String request = httpServletRequest.getRequestURI(); - String version = extractVersion(request); - - JSONObject jsonObject; - try { - jsonObject = new JSONObject(jsonPayload); - } catch (Exception e) { - log.error(INVALID_JSON); - return ResponseEntity.badRequest().body(INVALID_JSON); - } - - String uuid = setUpECOMPLoggingForRequest(); - incomingRequestsLogger.info(String.format( - "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'", - jsonObject, uuid, version, httpServletRequest.getRemoteHost())); - - if (applicationSettings.jsonSchemaValidationEnabled()) { - if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) { - if (!conformsToSchema(jsonObject, version)) { - return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED); - } - } else if (!isBatchRequest(request) && (!jsonObject.has("eventList") && (jsonObject.has("event")))) { - if (!conformsToSchema(jsonObject, version)) { - return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED); - } - } else { - return errorResponse(ApiException.INVALID_JSON_INPUT); - } + @PostMapping(value = {"/eventListener/{version}"}, consumes = "application/json") + ResponseEntity<String> event(@RequestBody String event, @PathVariable String version, HttpServletRequest request) { + if (settings.isVersionSupported(version)) { + return process(event, version, request, EVENT); } + return badRequest().contentType(MediaType.APPLICATION_JSON).body(String.format("API version %s is not supported", version)); + } - JSONArray commonlyFormatted = convertToJSONArrayCommonFormat(jsonObject, request, uuid, version); - if (!putEventsOnProcessingQueue(commonlyFormatted)) { - errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES); - return errorResponse(ApiException.NO_SERVER_RESOURCES); + @PostMapping(value = {"/eventListener/{version}/eventBatch"}, consumes = "application/json") + ResponseEntity<String> events(@RequestBody String events, @PathVariable String version, HttpServletRequest request) { + if (settings.isVersionSupported(version)) { + return process(events, version, request, EVENT_LIST); } - return accepted() - .contentType(MediaType.APPLICATION_JSON) - .body("Accepted"); + return badRequest().contentType(MediaType.APPLICATION_JSON).body(String.format("API version %s is not supported", version)); } - private String extractVersion(String httpServletRequest) { - return httpServletRequest.split("/")[2]; - } + private ResponseEntity<String> process(String events, String version, HttpServletRequest request, String type) { - private ResponseEntity<String> errorResponse(ApiException noServerResources) { - return ResponseEntity.status(noServerResources.httpStatusCode) - .body(noServerResources.toJSON().toString()); - } + JSONObject jsonObject = new JSONObject(events); - private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) { - for (int i = 0; i < arrayOfEvents.length(); i++) { - metricsLog.info("EVENT_PUBLISH_START"); - if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) { - return false; - } - } - log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); - metricsLog.info("EVENT_PUBLISH_END"); - return true; - } + EventValidator eventValidator = new EventValidator(settings); + Optional<ResponseEntity<String>> validationResult = eventValidator.validate(jsonObject, type, version); - private boolean conformsToSchema(JSONObject payload, String version) { - try { - JsonSchema schema = applicationSettings.jsonSchema(version); - ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString())); - if (!report.isSuccess()) { - log.warn("Schema validation failed for event: " + payload); - stream(report.spliterator(), false).forEach(e -> log.warn(e.getMessage())); - return false; - } - return report.isSuccess(); - } catch (Exception e) { - throw new ApplicationException("Unable to validate against schema", e); + if (validationResult.isPresent()){ + return validationResult.get(); } + JSONArray arrayOfEvents = new EventUpdater(settings).convert(jsonObject,version, generateUUID(version, request.getRequestURI(), jsonObject), type); + eventSender.send(arrayOfEvents); + // TODO call service and return status, replace CambriaClient, split event to single object and list of them + return accepted().contentType(MediaType.APPLICATION_JSON).body("Accepted"); } - private static JSONArray convertToJSONArrayCommonFormat(JSONObject jsonObject, String request, - String uuid, String version) { - JSONArray asArrayEvents = new JSONArray(); - String vesUniqueIdKey = "VESuniqueId"; - String vesVersionKey = "VESversion"; - if (isBatchRequest(request)) { - JSONArray events = jsonObject.getJSONArray("eventList"); - for (int i = 0; i < events.length(); i++) { - JSONObject event = new JSONObject().put("event", events.getJSONObject(i)); - event.put(vesUniqueIdKey, uuid + "-" + i); - event.put(vesVersionKey, version); - asArrayEvents.put(event); - } - } else { - jsonObject.put(vesUniqueIdKey, uuid); - jsonObject.put(vesVersionKey, version); - asArrayEvents = new JSONArray().put(jsonObject); - } - return asArrayEvents; + private UUID generateUUID(String version, String uri, JSONObject jsonObject) { + UUID uuid = UUID.randomUUID(); + setUpECOMPLoggingForRequest(uuid); + requestLogger.info(String.format(VES_EVENT_MESSAGE, jsonObject, uuid, version, uri)); + return uuid; } - private static String setUpECOMPLoggingForRequest() { - final UUID uuid = UUID.randomUUID(); + private static void setUpECOMPLoggingForRequest(UUID uuid) { LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - return uuid.toString(); - } - - private static boolean isBatchRequest(String request) { - return request.contains("eventBatch"); } }
\ No newline at end of file |