aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/restapi
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae/restapi')
-rw-r--r--src/main/java/org/onap/dcae/restapi/ApiAuthInterceptor.java78
-rw-r--r--src/main/java/org/onap/dcae/restapi/ApiConfiguration.java (renamed from src/main/java/org/onap/dcae/restapi/endpoints/Ui.java)33
-rw-r--r--src/main/java/org/onap/dcae/restapi/ApiException.java22
-rw-r--r--src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java127
-rw-r--r--src/main/java/org/onap/dcae/restapi/ServletConfig.java79
-rw-r--r--src/main/java/org/onap/dcae/restapi/VesRestController.java214
-rw-r--r--src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java247
7 files changed, 406 insertions, 394 deletions
diff --git a/src/main/java/org/onap/dcae/restapi/ApiAuthInterceptor.java b/src/main/java/org/onap/dcae/restapi/ApiAuthInterceptor.java
new file mode 100644
index 00000000..864a16d7
--- /dev/null
+++ b/src/main/java/org/onap/dcae/restapi/ApiAuthInterceptor.java
@@ -0,0 +1,78 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.restapi;
+
+import io.vavr.control.Option;
+import org.onap.dcae.ApplicationSettings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.Base64;
+
+final class ApiAuthInterceptor extends HandlerInterceptorAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ApiAuthInterceptor.class);
+ private final ApplicationSettings applicationSettings;
+
+ private Logger errorLog;
+
+ ApiAuthInterceptor(ApplicationSettings applicationSettings, Logger errorLog) {
+ this.applicationSettings = applicationSettings;
+ this.errorLog = errorLog;
+ }
+
+ @Override
+ public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
+ Object handler) throws IOException {
+ if (applicationSettings.authorizationEnabled()) {
+ String authorizationHeader = request.getHeader("Authorization");
+ if (authorizationHeader == null || !isAuthorized(authorizationHeader)) {
+ response.setStatus(400);
+ errorLog.error("EVENT_RECEIPT_FAILURE: Unauthorized user");
+ response.getWriter().write(ApiException.UNAUTHORIZED_USER.toJSON().toString());
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isAuthorized(String authorizationHeader) {
+ try {
+ String encodedData = authorizationHeader.split(" ")[1];
+ String decodedData = new String(Base64.getDecoder().decode(encodedData));
+ String providedUser = decodedData.split(":")[0].trim();
+ String providedPassword = decodedData.split(":")[1].trim();
+ Option<String> maybeSavedPassword = applicationSettings.validAuthorizationCredentials().get(providedUser);
+ boolean userRegistered = maybeSavedPassword.isDefined();
+ return userRegistered && maybeSavedPassword.get().equals(providedPassword);
+ } catch (Exception e) {
+ LOG.warn(String.format("Could not check if user is authorized (header: '%s')), probably malformed header.",
+ authorizationHeader), e);
+ return false;
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/restapi/endpoints/Ui.java b/src/main/java/org/onap/dcae/restapi/ApiConfiguration.java
index ae593b44..85db81df 100644
--- a/src/main/java/org/onap/dcae/restapi/endpoints/Ui.java
+++ b/src/main/java/org/onap/dcae/restapi/ApiConfiguration.java
@@ -1,15 +1,16 @@
-/*-
+/*
* ============LICENSE_START=======================================================
* PROJECT
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.s
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,14 +19,28 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.restapi.endpoints;
+package org.onap.dcae.restapi;
-import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
-import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext;
+import org.onap.dcae.ApplicationSettings;
+import org.slf4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
-public class Ui extends NsaBaseEndpoint {
+@Configuration
+public class ApiConfiguration implements WebMvcConfigurer {
+ private final ApplicationSettings applicationSettings;
+ private Logger errorLogger;
- public static void hello(DrumlinRequestContext ctx) {
- ctx.renderer().renderTemplate("templates/hello.html");
+ @Autowired
+ ApiConfiguration(ApplicationSettings applicationSettings, Logger errorLogger) {
+ this.applicationSettings = applicationSettings;
+ this.errorLogger = errorLogger;
+ }
+
+ @Override
+ public void addInterceptors(InterceptorRegistry registry) {
+ registry.addInterceptor(new ApiAuthInterceptor(applicationSettings, errorLogger));
}
}
diff --git a/src/main/java/org/onap/dcae/restapi/ApiException.java b/src/main/java/org/onap/dcae/restapi/ApiException.java
index 0f922678..53895ffe 100644
--- a/src/main/java/org/onap/dcae/restapi/ApiException.java
+++ b/src/main/java/org/onap/dcae/restapi/ApiException.java
@@ -33,10 +33,10 @@ public enum ApiException {
UNAUTHORIZED_USER(ExceptionType.POLICY_EXCEPTION, "POL2000", "Unauthorized user", 401),
NO_SERVER_RESOURCES(ExceptionType.SERVICE_EXCEPTION, "SVC1000", "No server resources (internal processing queue full)", 503);
+ public final int httpStatusCode;
private final ExceptionType type;
private final String code;
private final String details;
- public final int httpStatusCode;
ApiException(ExceptionType type, String code, String details, int httpStatusCode) {
this.type = type;
@@ -45,18 +45,9 @@ public enum ApiException {
this.httpStatusCode = httpStatusCode;
}
- public enum ExceptionType {
- SERVICE_EXCEPTION, POLICY_EXCEPTION;
-
- @Override
- public String toString() {
- return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, this.name());
- }
- }
-
public JSONObject toJSON() {
JSONObject exceptionTypeNode = new JSONObject();
- exceptionTypeNode.put("messageId", code );
+ exceptionTypeNode.put("messageId", code);
exceptionTypeNode.put("text", details);
JSONObject requestErrorNode = new JSONObject();
@@ -67,4 +58,13 @@ public enum ApiException {
return rootNode;
}
+ public enum ExceptionType {
+ SERVICE_EXCEPTION, POLICY_EXCEPTION;
+
+ @Override
+ public String toString() {
+ return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, this.name());
+ }
+ }
+
}
diff --git a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java b/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java
deleted file mode 100644
index e5a29e9f..00000000
--- a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java
+++ /dev/null
@@ -1,127 +0,0 @@
-
-/*
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.restapi;
-
-import java.io.IOException;
-import java.net.URL;
-
-import javax.servlet.ServletException;
-
-import org.apache.tomcat.util.codec.binary.Base64;
-import org.onap.dcae.ApplicationSettings;
-import org.onap.dcae.commonFunction.CommonStartup;
-import org.onap.dcae.commonFunction.VESLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.apiServer.CommonServlet;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.drumlin.service.framework.routing.DrumlinRequestRouter;
-import com.att.nsa.drumlin.service.framework.routing.playish.DrumlinPlayishRoutingFileSource;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.nsa.security.NsaAuthenticator;
-
-import com.att.nsa.security.authenticators.SimpleAuthenticator;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-
-public class RestfulCollectorServlet extends CommonServlet
-{
-
- private static final long serialVersionUID = 1L;
- private static final Logger log = LoggerFactory.getLogger ( RestfulCollectorServlet.class );
-
- private static String authCredentialsList;
-
- public RestfulCollectorServlet ( ApplicationSettings settings ) throws loadException, missingReqdSetting
- {
- super ( settings.torrNvReadable(), "collector", false );
- authCredentialsList = settings.validAuthorizationCredentials();
- }
-
-
-
-
- /**
- * This is called once at server start. Use it to init any shared objects and setup the route mapping.
- */
- @Override
- protected void servletSetup () throws rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException
- {
- super.servletSetup ();
-
- try {
- // the base class provides a bunch of things like API authentication and ECOMP compliant
- // logging. The Restful Collector likely doesn't need API authentication, so for now,
- // we init the base class services with an in-memory (and empty!) config DB.
- commonServletSetup ( ConfigDbType.MEMORY );
-
- VESLogger.setUpEcompLogging();
-
- // setup the servlet routing and error handling
- final DrumlinRequestRouter drr = getRequestRouter ();
-
- // you can tell the request router what to do when a particular kind of exception is thrown.
- drr.setHandlerForException(IllegalArgumentException.class,
- (ctx, cause) -> sendJsonReply (ctx, HttpStatusCodes.k400_badRequest, cause.getMessage() ));
-
- // load the routes from the config file
- final URL routes = findStream ( "routes.conf" );
- if ( routes == null ) throw new rrNvReadable.missingReqdSetting ( "No routing configuration." );
- final DrumlinPlayishRoutingFileSource drs = new DrumlinPlayishRoutingFileSource ( routes );
- drr.addRouteSource ( drs );
-
- if (CommonStartup.authflag) {
- NsaAuthenticator<NsaSimpleApiKey> NsaAuth;
- NsaAuth = createAuthenticator(authCredentialsList);
-
- this.getSecurityManager().addAuthenticator(NsaAuth);
- }
-
- log.info ( "Restful Collector Servlet is up." );
- }
- catch ( SecurityException | IOException | ConfigDbException e ) {
- throw new ServletException ( e );
- }
- }
-
- public NsaAuthenticator<NsaSimpleApiKey> createAuthenticator(String authCredentials) {
- NsaAuthenticator<NsaSimpleApiKey> authenticator = new SimpleAuthenticator();
- if (authCredentials != null) {
- String authpair[] = authCredentials.split("\\|");
- for (String pair : authpair) {
- String lineid[] = pair.split(",");
- String listauthid = lineid[0];
- String listauthpwd = new String(Base64.decodeBase64(lineid[1]));
- ((SimpleAuthenticator) authenticator).add(listauthid, listauthpwd);
- }
-
- } else {
- ((SimpleAuthenticator) authenticator).add("admin", "collectorpasscode");
- }
- return authenticator;
- }
-
-}
-
diff --git a/src/main/java/org/onap/dcae/restapi/ServletConfig.java b/src/main/java/org/onap/dcae/restapi/ServletConfig.java
new file mode 100644
index 00000000..e8efa375
--- /dev/null
+++ b/src/main/java/org/onap/dcae/restapi/ServletConfig.java
@@ -0,0 +1,79 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.s
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.restapi;
+
+import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.SchemaValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.web.server.Ssl;
+import org.springframework.boot.web.server.WebServerFactoryCustomizer;
+import org.springframework.boot.web.servlet.server.ConfigurableServletWebServerFactory;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+
+import static java.nio.file.Files.readAllBytes;
+
+@Component
+public class ServletConfig implements WebServerFactoryCustomizer<ConfigurableServletWebServerFactory> {
+
+ private static final Logger log = LoggerFactory.getLogger(SchemaValidator.class);
+
+ @Autowired
+ private ApplicationSettings properties;
+
+ @Override
+ public void customize(ConfigurableServletWebServerFactory container) {
+ if (properties.authorizationEnabled()) {
+ container.setSsl(createSSL());
+ container.setPort(properties.httpsPort());
+ } else {
+ container.setPort(properties.httpPort());
+ }
+ }
+
+ private Ssl createSSL() {
+ log.info("Enabling SSL");
+ Ssl ssl = new Ssl();
+ ssl.setEnabled(true);
+ String keyStore = Paths.get(properties.keystoreFileLocation()).toAbsolutePath().toString();
+ log.info("Using keyStore path: " + keyStore);
+ ssl.setKeyStore(keyStore);
+ String keyPasswordFileLocation = Paths.get(properties.keystorePasswordFileLocation()).toAbsolutePath().toString();
+ log.info("Using keyStore password from: " + keyPasswordFileLocation);
+ ssl.setKeyPassword(getKeyStorePassword(keyPasswordFileLocation));
+ ssl.setKeyAlias(properties.keystoreAlias());
+ return ssl;
+ }
+
+ private String getKeyStorePassword(String location) {
+ try {
+ return new String(readAllBytes(Paths.get(location)));
+ } catch (IOException e) {
+ log.error("Could not read keystore password from: '" + location + "'.", e);
+ throw new RuntimeException(e);
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/restapi/VesRestController.java b/src/main/java/org/onap/dcae/restapi/VesRestController.java
new file mode 100644
index 00000000..b7fc5f3b
--- /dev/null
+++ b/src/main/java/org/onap/dcae/restapi/VesRestController.java
@@ -0,0 +1,214 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.s
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.restapi;
+
+import static java.util.Optional.ofNullable;
+import static java.util.stream.StreamSupport.stream;
+import static org.springframework.http.ResponseEntity.accepted;
+import static org.springframework.http.ResponseEntity.ok;
+
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
+import com.github.fge.jackson.JsonLoader;
+import com.github.fge.jsonschema.core.report.ProcessingReport;
+import com.github.fge.jsonschema.main.JsonSchema;
+
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import javax.servlet.http.HttpServletRequest;
+
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.CollectorSchemas;
+import org.onap.dcae.commonFunction.VESLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+public class VesRestController {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VesRestController.class);
+
+ private static final String FALLBACK_VES_VERSION = "v5";
+
+ @Autowired
+ private ApplicationSettings collectorProperties;
+
+ @Autowired
+ private CollectorSchemas schemas;
+
+ @Autowired
+ @Qualifier("metriclog")
+ private Logger metriclog;
+
+ @Autowired
+ @Qualifier("incomingRequestsLogger")
+ private Logger incomingRequestsLogger;
+
+ @Autowired
+ @Qualifier("errorLog")
+ private Logger errorLog;
+
+ private LinkedBlockingQueue<JSONObject> inputQueue;
+ private String version;
+
+ @Autowired
+ VesRestController(@Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger,
+ @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) {
+ this.incomingRequestsLogger = incomingRequestsLogger;
+ this.inputQueue = inputQueue;
+ }
+
+ @GetMapping("/")
+ String mainPage() {
+ return "Welcome to VESCollector";
+ }
+
+ //refactor in next iteration
+ @PostMapping(value = {"/eventListener/v1",
+ "/eventListener/v1/eventBatch",
+ "/eventListener/v2",
+ "/eventListener/v2/eventBatch",
+ "/eventListener/v3",
+ "/eventListener/v3/eventBatch",
+ "/eventListener/v4",
+ "/eventListener/v4/eventBatch",
+ "/eventListener/v5",
+ "/eventListener/v5/eventBatch"}, consumes = "application/json")
+ ResponseEntity<String> receiveEvent(@RequestBody String jsonPayload, HttpServletRequest httpServletRequest) {
+ String request = httpServletRequest.getRequestURI();
+ extractVersion(request);
+
+ JSONObject jsonObject;
+ try {
+ jsonObject = new JSONObject(jsonPayload);
+ } catch (Exception e) {
+ return ResponseEntity.badRequest().body(ApiException.INVALID_JSON_INPUT.toJSON().toString());
+ }
+
+ String uuid = setUpECOMPLoggingForRequest();
+ incomingRequestsLogger.info(String.format(
+ "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'",
+ jsonObject, uuid, version, httpServletRequest.getRemoteHost()));
+
+ if (collectorProperties.jsonSchemaValidationEnabled()) {
+ if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) {
+ if (!conformsToSchema(jsonObject, version)) {
+ return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
+ }
+ } else if (!isBatchRequest(request) && (!jsonObject.has("eventList") && (jsonObject.has("event")))) {
+ if (!conformsToSchema(jsonObject, version)) {
+ return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
+ }
+ } else {
+ return errorResponse(ApiException.INVALID_JSON_INPUT);
+ }
+ }
+
+ JSONArray commonlyFormatted = convertToJSONArrayCommonFormat(jsonObject, request, uuid, version);
+
+ if (!putEventsOnProcessingQueue(commonlyFormatted)) {
+ errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES);
+ return errorResponse(ApiException.NO_SERVER_RESOURCES);
+ }
+ return ok().contentType(MediaType.APPLICATION_JSON).body("Message Accepted");
+ }
+
+ private void extractVersion(String httpServletRequest) {
+ version = httpServletRequest.split("/")[2];
+ }
+
+ private ResponseEntity<String> errorResponse(ApiException noServerResources) {
+ return ResponseEntity.status(noServerResources.httpStatusCode)
+ .body(noServerResources.toJSON().toString());
+ }
+
+ private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) {
+ for (int i = 0; i < arrayOfEvents.length(); i++) {
+ metriclog.info("EVENT_PUBLISH_START");
+ if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) {
+ return false;
+ }
+ }
+ LOG.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
+ metriclog.info("EVENT_PUBLISH_END");
+ return true;
+ }
+
+ private boolean conformsToSchema(JSONObject payload, String version) {
+ try {
+ JsonSchema schema = ofNullable(schemas.getJSONSchemasMap(version).get(version))
+ .orElse(schemas.getJSONSchemasMap(version).get(FALLBACK_VES_VERSION));
+ ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString()));
+ if (!report.isSuccess()) {
+ LOG.warn("Schema validation failed for event: " + payload);
+ stream(report.spliterator(), false).forEach(e -> LOG.warn(e.getMessage()));
+ return false;
+ }
+ return report.isSuccess();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to validate against schema", e);
+ }
+ }
+
+ private static JSONArray convertToJSONArrayCommonFormat(JSONObject jsonObject, String request,
+ String uuid, String version) {
+ JSONArray asArrayEvents = new JSONArray();
+ String vesUniqueIdKey = "VESuniqueId";
+ String vesVersionKey = "VESversion";
+ if (isBatchRequest(request)) {
+ JSONArray events = jsonObject.getJSONArray("eventList");
+ for (int i = 0; i < events.length(); i++) {
+ JSONObject event = new JSONObject().put("event", events.getJSONObject(i));
+ event.put(vesUniqueIdKey, uuid + "-" + i);
+ event.put(vesVersionKey, version);
+ asArrayEvents.put(event);
+ }
+ } else {
+ jsonObject.put(vesUniqueIdKey, uuid);
+ jsonObject.put(vesVersionKey, version);
+ asArrayEvents = new JSONArray().put(jsonObject);
+ }
+ return asArrayEvents;
+ }
+
+ private static String setUpECOMPLoggingForRequest() {
+ final UUID uuid = UUID.randomUUID();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+ return uuid.toString();
+ }
+
+ private static boolean isBatchRequest(String request) {
+ return request.contains("eventBatch");
+ }
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java b/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java
deleted file mode 100644
index d60e2a11..00000000
--- a/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.restapi.endpoints;
-
-import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.drumlin.service.standards.MimeTypes;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-import com.google.gson.JsonParser;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.Base64;
-import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.onap.dcae.commonFunction.CommonStartup;
-import org.onap.dcae.commonFunction.CommonStartup.QueueFullException;
-import org.onap.dcae.commonFunction.VESLogger;
-import org.onap.dcae.restapi.ApiException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventReceipt extends NsaBaseEndpoint {
-
- private static final Logger log = LoggerFactory.getLogger(EventReceipt.class);
- private static final String MESSAGE = " Message:";
-
- public static void receiveVESEvent(DrumlinRequestContext ctx) {
- // the request body carries events. assume for now it's an array
- // of json objects that fits in memory. (See cambria's parsing for
- // handling large messages)
-
- NsaSimpleApiKey retkey = null;
-
-
- JSONObject jsonObject;
- InputStream istr = null;
- int arrayFlag = 0;
- String vesVersion = null;
- String userId=null;
-
- try {
-
-
- istr = ctx.request().getBodyStream();
- jsonObject = new JSONObject(new JSONTokener(istr));
-
- log.info("ctx getPathInContext: " + ctx.request().getPathInContext());
- Pattern p = Pattern.compile("(v\\d+)");
- Matcher m = p.matcher(ctx.request().getPathInContext());
-
- if (m.find()) {
- log.info("VES version:" + m.group());
- vesVersion = m.group();
- }
-
- final UUID uuid = UUID.randomUUID();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
-
- if (ctx.request().getPathInContext().contains("eventBatch")) {
- CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid
- + " VES Batch Input Messsage: " + jsonObject);
- log.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid + " VES Batch Input Messsage: "
- + jsonObject);
- arrayFlag = 1;
- } else {
- CommonStartup.inlog.info(
- ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);
- log.info(ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);
-
- }
-
- try {
- if (CommonStartup.authflag) {
- userId = getUser (ctx);
- retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx);
- }
- } catch (NullPointerException x) {
- //log.info("Invalid user request :" + userId + " FROM " + ctx.request().getRemoteAddress() + " " + ctx.request().getContentType() + MESSAGE + jsonObject);
- log.info(String.format("Unauthorized request %s FROM %s %s %s %s", getUser(ctx), ctx.request().getRemoteAddress(), ctx.request().getContentType(), MESSAGE, jsonObject));
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + userId + x);
- respondWithCustomMsginJson(ctx, ApiException.UNAUTHORIZED_USER);
- return;
- }
-
- if (schemaCheck(retkey, arrayFlag, jsonObject, vesVersion, ctx, uuid)) {
- return;
- }
-
- } catch (JSONException | NullPointerException | IOException x) {
- log.error(String.format("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest%d%s%s",
- HttpStatusCodes.k400_badRequest, MESSAGE, x.getMessage()));
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x);
- respondWithCustomMsginJson(ctx, ApiException.INVALID_JSON_INPUT);
- return;
- } catch (QueueFullException e) {
- log.error("Collector internal queue full :" + e.getMessage(), e);
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e);
- respondWithCustomMsginJson(ctx, ApiException.NO_SERVER_RESOURCES);
- return;
- } finally {
- if (istr != null) {
- safeClose(istr);
- }
- }
- log.info("MessageAccepted and k200_ok to be sent");
- ctx.response().sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson);
- }
-
-
- private static String getUser(DrumlinRequestContext ctx){
- String authorization = ctx.request().getFirstHeader("Authorization");
- if (authorization != null && authorization.startsWith("Basic")) {
- String base64Credentials = authorization.substring("Basic".length()).trim();
- String credentials = new String(Base64.getDecoder().decode(base64Credentials),
- Charset.forName("UTF-8"));
- final String[] values = credentials.split(":",2);
- log.debug("User:" + values[0] + " Pwd:" + values[1]);
- return values[0];
- }
- return null;
-
- }
-
- private static Boolean schemaCheck(NsaSimpleApiKey retkey, int arrayFlag,
- JSONObject jsonObject, String vesVersion,
- DrumlinRequestContext ctx, UUID uuid)
- throws JSONException, QueueFullException, IOException {
-
- JSONArray jsonArray;
- JSONArray jsonArrayMod = new JSONArray();
- JSONObject event;
- FileReader fr;
- if (retkey != null || !CommonStartup.authflag) {
- if (CommonStartup.schemaValidatorflag) {
- if ((arrayFlag == 1) && (jsonObject.has("eventList") && (!jsonObject.has("event")))
- || ((arrayFlag == 0) && (!jsonObject.has("eventList") && (jsonObject.has("event"))))) {
- fr = new FileReader(schemaFileVersion(vesVersion));
- String schema = new JsonParser().parse(fr).toString();
-
- String valresult = CommonStartup.validateAgainstSchema(jsonObject.toString(), schema);
- switch (valresult) {
- case "true":
- log.info("Validation successful");
- break;
- case "false":
- log.info("Validation failed");
- respondWithCustomMsginJson(ctx, ApiException.SCHEMA_VALIDATION_FAILED);
- return true;
- default:
- log.error("Validation errored" + valresult);
- respondWithCustomMsginJson(ctx, ApiException.INVALID_JSON_INPUT);
- return true;
- }
- } else {
- log.info("Validation failed");
- respondWithCustomMsginJson(ctx, ApiException.SCHEMA_VALIDATION_FAILED);
- return true;
- }
- if (arrayFlag == 1) {
- jsonArray = jsonObject.getJSONArray("eventList");
- log.info("Validation successful for all events in batch");
- for (int i = 0; i < jsonArray.length(); i++) {
- event = new JSONObject().put("event", jsonArray.getJSONObject(i));
- event.put("VESuniqueId", uuid + "-" + i);
- event.put("VESversion", vesVersion);
- jsonArrayMod.put(event);
- }
- log.info("Modified jsonarray:" + jsonArrayMod.toString());
- } else {
- jsonObject.put("VESuniqueId", uuid);
- jsonObject.put("VESversion", vesVersion);
- jsonArrayMod = new JSONArray().put(jsonObject);
- }
- }
-
- // reject anything that's not JSON
- if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) {
- log.info(String.format("Rejecting request with content type %s Message:%s",
- ctx.request().getContentType(), jsonObject));
- respondWithCustomMsginJson(ctx, ApiException.INVALID_CONTENT_TYPE);
- return true;
- }
-
- CommonStartup.handleEvents(jsonArrayMod);
- } else {
- log.info(String.format("Unauthorized request %s FROM %s %s %s %s", getUser(ctx), ctx.request().getRemoteAddress(), ctx.request().getContentType(), MESSAGE,
- jsonObject));
- respondWithCustomMsginJson(ctx, ApiException.UNAUTHORIZED_USER);
- return true;
- }
- return false;
- }
-
- private static void respondWithCustomMsginJson(DrumlinRequestContext ctx, ApiException apiException) {
- ctx.response()
- .sendErrorAndBody(apiException.httpStatusCode,
- apiException.toJSON().toString(), MimeTypes.kAppJson);
- }
-
- private static void safeClose(InputStream is) {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- log.error("Error closing Input stream : " + e);
- }
- }
-
- }
-
- public static String schemaFileVersion(String version) {
- return CommonStartup.schemaFileJson.has(version) ?
- CommonStartup.schemaFileJson.getString(version) : CommonStartup.schemaFileJson.getString("v5");
- }
-
-}
-