diff options
author | PawelSzalapski <pawel.szalapski@nokia.com> | 2018-07-31 08:18:03 +0200 |
---|---|---|
committer | PawelSzalapski <pawel.szalapski@nokia.com> | 2018-08-01 09:56:00 +0200 |
commit | fc073344d4c0eb8a28bf34c07a8439176cf846ca (patch) | |
tree | 01f5b4789c3d9369eaebb54a9f910a9fa400af1f /src/main/java/org | |
parent | d12cd3525284cc41414d8fdae09e2ffbc03a1fbb (diff) |
Replace nsaCore library with Spring
Change-Id: I2227939a67a2cbba2d392136d49ef4419600d186
Issue-ID: DCAEGEN2-602
Signed-off-by: PawelSzalapski <pawel.szalapski@nokia.com>
Diffstat (limited to 'src/main/java/org')
25 files changed, 1644 insertions, 1582 deletions
diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java index 0ebd1e90..9063faa4 100644 --- a/src/main/java/org/onap/dcae/ApplicationSettings.java +++ b/src/main/java/org/onap/dcae/ApplicationSettings.java @@ -21,41 +21,50 @@ package org.onap.dcae; -import com.att.nsa.drumlin.till.nv.impl.nvReadableStack; -import com.att.nsa.drumlin.till.nv.impl.nvReadableTable; -import com.att.nsa.drumlin.till.nv.rrNvReadable; import com.google.common.annotations.VisibleForTesting; import io.vavr.Function1; import io.vavr.collection.HashMap; +import io.vavr.collection.List; import io.vavr.collection.Map; -import org.apache.commons.configuration.ConfigurationConverter; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import javax.annotation.Nullable; import java.io.File; import java.nio.file.Paths; +import java.util.Base64; + +import static java.util.Arrays.stream; /** * Abstraction over application configuration. * Its job is to provide easily discoverable (by method names lookup) and type safe access to configuration properties. */ +@Component public class ApplicationSettings { private static final Logger inlog = LoggerFactory.getLogger(ApplicationSettings.class); private static final String COLLECTOR_PROPERTIES = "etc/collector.properties"; + + private final String appInvocationDir; private final PropertiesConfiguration properties = new PropertiesConfiguration(); public ApplicationSettings(String[] args, Function1<String[], Map<String, String>> argsParser) { + this(args, argsParser, System.getProperty("user.dir")); + } + + public ApplicationSettings(String[] args, Function1<String[], Map<String, String>> argsParser, String appInvocationDir) { + this.appInvocationDir = appInvocationDir; properties.setDelimiterParsingDisabled(true); Map<String, String> parsedArgs = argsParser.apply(args); loadProperties(Paths.get(new File(COLLECTOR_PROPERTIES).getAbsolutePath()).toString()); loadCommandLineProperties(parsedArgs); parsedArgs.filterKeys(k -> !k.equals("c")).forEach(this::updateProperty); } - private void loadCommandLineProperties(Map<String, String> parsedArgs) { parsedArgs.get("c").forEach(e -> { properties.clear(); @@ -63,7 +72,7 @@ public class ApplicationSettings { }); } - private void loadProperties(String property){ + private void loadProperties(String property) { try { properties.load(property); } catch (ConfigurationException ex) { @@ -72,8 +81,13 @@ public class ApplicationSettings { } } - public String validAuthorizationCredentials() { - return properties.getString("header.authlist", null); + public Map<String, String> validAuthorizationCredentials() { + return prepareUsersMap(properties.getString("header.authlist", null)); + } + + private Map<String, String> prepareUsersMap(@Nullable String allowedUsers) { + return allowedUsers == null ? HashMap.empty() : List.ofAll(stream(allowedUsers.split("\\|"))) + .toMap(t -> t.split(",")[0].trim(), t -> new String(Base64.getDecoder().decode(t.split(",")[1])).trim()); } public int maximumAllowedQueuedEvents() { @@ -110,11 +124,11 @@ public class ApplicationSettings { } public String keystorePasswordFileLocation() { - return properties.getString("collector.keystore.passwordfile", "./etc/passwordfile"); + return prependWithUserDirOnRelative(properties.getString("collector.keystore.passwordfile", "etc/passwordfile")); } public String keystoreFileLocation() { - return properties.getString("collector.keystore.file.location", "../etc/keystore"); + return prependWithUserDirOnRelative(properties.getString("collector.keystore.file.location", "etc/keystore")); } public String keystoreAlias() { @@ -126,7 +140,7 @@ public class ApplicationSettings { } public String cambriaConfigurationFileLocation() { - return properties.getString("collector.dmaapfile", "./etc/DmaapConfig.json"); + return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "etc/DmaapConfig.json")); } public Map<String, String[]> dMaaPStreamsMapping() { @@ -138,17 +152,6 @@ public class ApplicationSettings { } } - /* - * Kept back here for backward compatibility. - * RestfulCollectorServlet upon its initialization requires options to be represented - * as object represented by rrNvReadable interface, so we define a a handy transformation function here. - */ - public rrNvReadable torrNvReadable() { - final nvReadableStack settings = new nvReadableStack(); - settings.push(new nvReadableTable(ConfigurationConverter.getProperties(properties))); - return settings; - } - private Map<String, String[]> convertDMaaPStreamsPropertyToMap(String streamIdsProperty) { java.util.HashMap<String, String[]> domainToStreamIdsMapping = new java.util.HashMap<>(); String[] topics = streamIdsProperty.split("\\|"); @@ -168,6 +171,13 @@ public class ApplicationSettings { } } + public String prependWithUserDirOnRelative(String filePath) { + if (!Paths.get(filePath).isAbsolute()) { + filePath = Paths.get(appInvocationDir, filePath).toString(); + } + return filePath; + } + @VisibleForTesting String getStringDirectly(String key) { return properties.getString(key); diff --git a/src/main/java/org/onap/dcae/CLIUtils.java b/src/main/java/org/onap/dcae/CLIUtils.java index 6450d2e5..6764d5b2 100644 --- a/src/main/java/org/onap/dcae/CLIUtils.java +++ b/src/main/java/org/onap/dcae/CLIUtils.java @@ -24,36 +24,29 @@ package org.onap.dcae; import java.util.HashMap; /** - * CLIUtils extracted from nsaServerLibrary this implementation will be removed once we switch to different API library + * CLIUtils extracted from nsaServerLibrary this implementation will be removed once we switch to different API library */ public class CLIUtils { - public static io.vavr.collection.HashMap<String, String> processCmdLine (String[] args) { - final HashMap<String,String> map = new HashMap<String,String> (); + public static io.vavr.collection.HashMap<String, String> processCmdLine(String[] args) { + final HashMap<String, String> map = new HashMap<String, String>(); String lastKey = null; - for ( String arg : args ) - { - if ( arg.startsWith ( "-" ) ) - { - if ( lastKey != null ) - { - map.put ( lastKey.substring(1), "" ); + for (String arg : args) { + if (arg.startsWith("-")) { + if (lastKey != null) { + map.put(lastKey.substring(1), ""); } lastKey = arg; - } - else - { - if ( lastKey != null ) - { - map.put ( lastKey.substring(1), arg ); + } else { + if (lastKey != null) { + map.put(lastKey.substring(1), arg); } lastKey = null; } } - if ( lastKey != null ) - { - map.put ( lastKey.substring(1), "" ); + if (lastKey != null) { + map.put(lastKey.substring(1), ""); } return io.vavr.collection.HashMap.ofAll(map); } diff --git a/src/main/java/org/onap/dcae/CollectorSchemas.java b/src/main/java/org/onap/dcae/CollectorSchemas.java new file mode 100644 index 00000000..fc12b1f9 --- /dev/null +++ b/src/main/java/org/onap/dcae/CollectorSchemas.java @@ -0,0 +1,74 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved.s + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae; + +import static java.nio.file.Files.readAllBytes; +import static java.util.stream.Collectors.toMap; + +import com.fasterxml.jackson.databind.JsonNode; +import com.github.fge.jackson.JsonLoader; +import com.github.fge.jsonschema.core.exceptions.ProcessingException; +import com.github.fge.jsonschema.main.JsonSchema; +import com.github.fge.jsonschema.main.JsonSchemaFactory; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.AbstractMap; +import java.util.Map; +import org.json.JSONObject; +import org.onap.dcae.restapi.VesRestController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class CollectorSchemas { + + private static final Logger LOG = (Logger) LoggerFactory.getLogger(VesRestController.class); + + @Autowired + private ApplicationSettings collectorProperties; + + //refactor is needed in next iteration + public Map<String, JsonSchema> getJSONSchemasMap(String version) { + JSONObject jsonObject = collectorProperties.jsonSchema(); + Map<String, JsonSchema> schemas = jsonObject.toMap().entrySet().stream().map( + versionToFilePath -> { + try { + String schemaContent = new String( + readAllBytes(Paths.get(versionToFilePath.getValue().toString()))); + JsonNode schemaNode = JsonLoader.fromString(schemaContent); + JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode); + return new AbstractMap.SimpleEntry<>(versionToFilePath.getKey(), schema); + } catch (IOException | ProcessingException e) { + LOG.error("Could not read schema from path: " + versionToFilePath.getValue(), e); + throw new RuntimeException( + "Could not read schema from path: " + versionToFilePath.getValue(), e); + } + } + ).collect(toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)); + if (schemas.get(version) == null && collectorProperties.eventTransformingEnabled()) { + LOG.error(String.format("Missing necessary %s JSON schema", version)); + throw new RuntimeException(String.format("Missing necessary %s JSON schema", version)); + } + return schemas; + } +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/SchemaValidator.java b/src/main/java/org/onap/dcae/SchemaValidator.java new file mode 100644 index 00000000..e4b52cfb --- /dev/null +++ b/src/main/java/org/onap/dcae/SchemaValidator.java @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved.s + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonNode; +import com.github.fge.jackson.JsonLoader; +import com.github.fge.jsonschema.core.exceptions.ProcessingException; +import com.github.fge.jsonschema.core.report.ProcessingMessage; +import com.github.fge.jsonschema.core.report.ProcessingReport; +import com.github.fge.jsonschema.main.JsonSchema; +import com.github.fge.jsonschema.main.JsonSchemaFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class SchemaValidator { + + private static final Logger log = LoggerFactory.getLogger(SchemaValidator.class); + + //refactor in next iteration + public static String validateAgainstSchema(String jsonData, String jsonSchema) { + ProcessingReport report; + String result = "false"; + + try { + log.trace("Schema validation for event:" + jsonData); + JsonNode schemaNode = JsonLoader.fromString(jsonSchema); + JsonNode data = JsonLoader.fromString(jsonData); + JsonSchemaFactory factory = JsonSchemaFactory.byDefault(); + JsonSchema schema = factory.getJsonSchema(schemaNode); + report = schema.validate(data); + } catch (JsonParseException e) { + log.error("validateAgainstSchema:JsonParseException for event:" + jsonData); + return e.getMessage(); + } catch (ProcessingException e) { + log.error("validateAgainstSchema:Processing exception for event:" + jsonData); + return e.getMessage(); + } catch (IOException e) { + log.error( + "validateAgainstSchema:IO exception; something went wrong trying to read json data for event:" + jsonData); + return e.getMessage(); + } + if (report != null) { + for (ProcessingMessage pm : report) { + log.trace("Processing Message: " + pm.getMessage()); + } + result = String.valueOf(report.isSuccess()); + } + try { + log.debug("Validation Result:" + result + " Validation report:" + report); + } catch (NullPointerException e) { + log.error("validateAgainstSchema:NullpointerException on report"); + } + return result; + } +} diff --git a/src/main/java/org/onap/dcae/VesApplication.java b/src/main/java/org/onap/dcae/VesApplication.java new file mode 100644 index 00000000..86b8ccb0 --- /dev/null +++ b/src/main/java/org/onap/dcae/VesApplication.java @@ -0,0 +1,110 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae; + +import io.vavr.collection.Map; +import org.json.JSONObject; +import org.onap.dcae.commonFunction.EventProcessor; +import org.onap.dcae.commonFunction.event.publishing.DMaaPConfigurationParser; +import org.onap.dcae.commonFunction.event.publishing.EventPublisher; +import org.onap.dcae.commonFunction.event.publishing.PublisherConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration; +import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Lazy; + +import java.nio.file.Paths; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +@SpringBootApplication +@EnableAutoConfiguration(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class}) +public class VesApplication { + + private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); + private static final Logger incomingRequestsLogger = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input"); + private static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output"); + private static final Logger errorLog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error"); + private static final int MAX_THREADS = 20; + public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue; + private static ApplicationSettings properties; + + public static void main(String[] args) { + SpringApplication app = new SpringApplication(VesApplication.class); + + properties = new ApplicationSettings(args, CLIUtils::processCmdLine); + + fProcessingInputQueue = new LinkedBlockingQueue<>(properties.maximumAllowedQueuedEvents()); + + app.setAddCommandLineProperties(true); + app.run(); + + EventProcessor ep = new EventProcessor(EventPublisher.createPublisher(oplog, getDmapConfig()), properties); + + ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS); + for (int i = 0; i < MAX_THREADS; ++i) { + executor.execute(ep); + } + } + + + private static Map<String, PublisherConfig> getDmapConfig() { + return DMaaPConfigurationParser. + parseToDomainMapping(Paths.get(properties.cambriaConfigurationFileLocation())).get(); + } + + @Bean + @Lazy + public ApplicationSettings applicationSettings() { + return properties; + } + + @Bean + @Qualifier("incomingRequestsLogger") + public Logger incomingRequestsLogger() { + return incomingRequestsLogger; + } + + @Bean + @Qualifier("metriclog") + public Logger incomingRequestsMetricsLogger() { + return metriclog; + } + + @Bean + @Qualifier("errorLog") + public Logger errorLogger() { + return errorLog; + } + + @Bean + public LinkedBlockingQueue<JSONObject> inputQueue() { + return fProcessingInputQueue; + } + +} diff --git a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java index 97d73ddd..7be45b0c 100644 --- a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java +++ b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java @@ -19,16 +19,17 @@ */ package org.onap.dcae.commonFunction; -import static io.vavr.API.Set; - import io.vavr.collection.List; import io.vavr.collection.Set; import io.vavr.control.Option; -import java.util.stream.StreamSupport; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; +import java.util.stream.StreamSupport; + +import static io.vavr.API.Set; + /** * This class is a wrapper for 2 most used entities of org.json lib: JSONArray and JSONObject and comprises utility * methods for fast access of json structures without need to explicitly coerce between them. While using this, bear in @@ -109,5 +110,4 @@ public class AnyNode { return (JSONObject) this.obj; } - } diff --git a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java deleted file mode 100644 index 36713aa4..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java +++ /dev/null @@ -1,204 +0,0 @@ -/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.commonFunction;
-
-import com.att.nsa.apiServer.ApiServer;
-import com.att.nsa.apiServer.ApiServerConnector;
-import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.github.fge.jackson.JsonLoader;
-import com.github.fge.jsonschema.core.exceptions.ProcessingException;
-import com.github.fge.jsonschema.core.report.ProcessingMessage;
-import com.github.fge.jsonschema.core.report.ProcessingReport;
-import com.github.fge.jsonschema.main.JsonSchema;
-import com.github.fge.jsonschema.main.JsonSchemaFactory;
-import org.apache.catalina.LifecycleException;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.onap.dcae.ApplicationSettings;
-import org.onap.dcae.CLIUtils;
-import org.onap.dcae.commonFunction.event.publishing.DMaaPConfigurationParser;
-import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
-import org.onap.dcae.restapi.RestfulCollectorServlet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class CommonStartup extends NsaBaseEndpoint implements Runnable {
-
- private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
- public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");
- static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
- public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");
-
- static int maxQueueEvent = 1024 * 4;
- public static boolean schemaValidatorflag = false;
- public static boolean authflag = false;
- static boolean eventTransformFlag = true;
- public static JSONObject schemaFileJson;
- static String cambriaConfigFile;
- public static io.vavr.collection.Map<String , String [] > streamID;
-
- static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
- private static ApiServer fTomcatServer = null;
- private static final Logger log = LoggerFactory.getLogger(CommonStartup.class);
-
- private CommonStartup(ApplicationSettings settings) throws loadException, IOException, rrNvReadable.missingReqdSetting {
- final List<ApiServerConnector> connectors = new LinkedList<>();
-
- if (!settings.authorizationEnabled()) {
- connectors.add(new ApiServerConnector.Builder(settings.httpPort()).secure(false).build());
- }
-
- final int securePort = settings.httpsPort();
- final String keystoreFile = settings.keystoreFileLocation();
- final String keystorePasswordFile = settings.keystorePasswordFileLocation();
- final String keyAlias = settings.keystoreAlias();
-
- if (settings.authorizationEnabled()) {
- String keystorePassword = readFile(keystorePasswordFile);
- connectors.add(new ApiServerConnector.Builder(securePort).secure(true)
- .keystorePassword(keystorePassword).keystoreFile(keystoreFile).keyAlias(keyAlias).build());
-
- }
-
- schemaValidatorflag = settings.jsonSchemaValidationEnabled();
- maxQueueEvent = settings.maximumAllowedQueuedEvents();
- if (schemaValidatorflag) {
- schemaFileJson = settings.jsonSchema();
-
- }
- authflag = settings.authorizationEnabled();
- cambriaConfigFile = settings.cambriaConfigurationFileLocation();
- streamID = settings.dMaaPStreamsMapping();
- eventTransformFlag = settings.eventTransformingEnabled();
-
- fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true)
- .name("collector").build();
- }
-
- public static void main(String[] args) {
- try {
-
- fProcessingInputQueue = new LinkedBlockingQueue<>(CommonStartup.maxQueueEvent);
-
- VESLogger.setUpEcompLogging();
-
- CommonStartup cs = new CommonStartup(new ApplicationSettings(args, CLIUtils::processCmdLine));
-
- Thread commonStartupThread = new Thread(cs);
- commonStartupThread.start();
-
- EventProcessor ep = new EventProcessor(EventPublisher.createPublisher(oplog,
- DMaaPConfigurationParser
- .parseToDomainMapping(Paths.get(cambriaConfigFile))
- .get()));
- ExecutorService executor = Executors.newFixedThreadPool(20);
- for (int i = 0; i < 20; ++i) {
- executor.execute(ep);
- }
- } catch (Exception e) {
- CommonStartup.eplog.error("Fatal error during application startup", e);
- throw new RuntimeException(e);
- }
- }
-
- public void run() {
- try {
- fTomcatServer.start();
- fTomcatServer.await();
- } catch (LifecycleException | IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static class QueueFullException extends Exception {
-
- private static final long serialVersionUID = 1L;
- }
-
- public static void handleEvents(JSONArray a) throws QueueFullException, JSONException {
- CommonStartup.metriclog.info("EVENT_PUBLISH_START");
- for (int i = 0; i < a.length(); i++) {
- if (!fProcessingInputQueue.offer(a.getJSONObject(i))) {
- throw new QueueFullException();
- }
- }
- log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
- CommonStartup.metriclog.info("EVENT_PUBLISH_END");
- }
-
- private static String readFile(String path) throws IOException {
- byte[] encoded = Files.readAllBytes(Paths.get(path));
- String pwd = new String(encoded);
- return pwd.substring(0, pwd.length() - 1);
- }
-
- public static String validateAgainstSchema(String jsonData, String jsonSchema) {
- ProcessingReport report;
- String result = "false";
-
- try {
- log.trace("Schema validation for event:" + jsonData);
- JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
- JsonNode data = JsonLoader.fromString(jsonData);
- JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
- JsonSchema schema = factory.getJsonSchema(schemaNode);
- report = schema.validate(data);
- } catch (JsonParseException e) {
- log.error("validateAgainstSchema:JsonParseException for event:" + jsonData);
- return e.getMessage();
- } catch (ProcessingException e) {
- log.error("validateAgainstSchema:Processing exception for event:" + jsonData);
- return e.getMessage();
- } catch (IOException e) {
- log.error(
- "validateAgainstSchema:IO exception; something went wrong trying to read json data for event:" + jsonData);
- return e.getMessage();
- }
- if (report != null) {
- for (ProcessingMessage pm : report) {
- log.trace("Processing Message: " + pm.getMessage());
- }
- result = String.valueOf(report.isSuccess());
- }
- try {
- log.debug("Validation Result:" + result + " Validation report:" + report);
- } catch (NullPointerException e) {
- log.error("validateAgainstSchema:NullpointerException on report");
- }
- return result;
- }
-
-
-}
diff --git a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java index a6de0fc8..e3d59098 100644 --- a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java +++ b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java @@ -8,9 +8,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,483 +21,483 @@ package org.onap.dcae.commonFunction; -import java.text.DecimalFormat; import org.json.JSONArray; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ConfigProcessors { - - private static final Logger log = LoggerFactory.getLogger(ConfigProcessors.class); - private static final String FIELD = "field"; - private static final String OLD_FIELD = "oldField"; - private static final String FILTER = "filter"; - private static final String VALUE = "value"; - private static final String REGEX = "\\[\\]"; - private static final String OBJECT_NOT_FOUND = "ObjectNotFound"; - private static final String FILTER_NOT_MET = "Filter not met"; - private static final String COMP_FALSE = "==false"; - - private final JSONObject event; - - public ConfigProcessors(JSONObject eventJson) { - event = eventJson; - } - - public void getValue(JSONObject jsonObject) { - - final String field = jsonObject.getString(FIELD); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - - if (filter == null || isFilterMet(filter)) { - getEventObjectVal(field); - } else - log.info(FILTER_NOT_MET); - } +import java.text.DecimalFormat; +public class ConfigProcessors { - public void setValue(JSONObject jsonObject) { - final String field = jsonObject.getString(FIELD); - final String value = jsonObject.getString(VALUE); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - if (filter == null || isFilterMet(filter)) { - setEventObjectVal(field, value); - } else - log.info(FILTER_NOT_MET); - } + private static final Logger log = LoggerFactory.getLogger(ConfigProcessors.class); + private static final String FIELD = "field"; + private static final String OLD_FIELD = "oldField"; + private static final String FILTER = "filter"; + private static final String VALUE = "value"; + private static final String REGEX = "\\[\\]"; + private static final String OBJECT_NOT_FOUND = "ObjectNotFound"; + private static final String FILTER_NOT_MET = "Filter not met"; + private static final String COMP_FALSE = "==false"; + private final JSONObject event; + public ConfigProcessors(JSONObject eventJson) { + event = eventJson; + } - private String evaluate(String str) { - String value = str; - if (str.startsWith("$")) { - value = (String) getEventObjectVal(str.substring(1)); + public void getValue(JSONObject jsonObject) { - } - return value; - } + final String field = jsonObject.getString(FIELD); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + if (filter == null || isFilterMet(filter)) { + getEventObjectVal(field); + } else + log.info(FILTER_NOT_MET); + } - public void suppressEvent(JSONObject jsonObject) { - final JSONObject filter = jsonObject.optJSONObject(FILTER); - if (filter == null || isFilterMet(filter)) { - setEventObjectVal("suppressEvent", "true"); - } else - log.info(FILTER_NOT_MET); - } + public void setValue(JSONObject jsonObject) { + final String field = jsonObject.getString(FIELD); + final String value = jsonObject.getString(VALUE); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + if (filter == null || isFilterMet(filter)) { + setEventObjectVal(field, value); + } else + log.info(FILTER_NOT_MET); + } - public void addAttribute(JSONObject jsonObject) { + private String evaluate(String str) { + String value = str; + if (str.startsWith("$")) { + value = (String) getEventObjectVal(str.substring(1)); + + } + return value; + } - final String field = jsonObject.getString(FIELD); - final String value = evaluate(jsonObject.getString(VALUE)); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - final String fieldType = jsonObject.optString("fieldType", "string").toLowerCase(); - if (filter == null || isFilterMet(filter)) { - setEventObjectVal(field, value, fieldType); - } else - log.info(FILTER_NOT_MET); - } + public void suppressEvent(JSONObject jsonObject) { + final JSONObject filter = jsonObject.optJSONObject(FILTER); + if (filter == null || isFilterMet(filter)) { + setEventObjectVal("suppressEvent", "true"); + } else + log.info(FILTER_NOT_MET); + } - public void updateAttribute(JSONObject jsonObject) { - final String field = jsonObject.getString(FIELD); - final String value = evaluate(jsonObject.getString(VALUE)); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - if (filter == null || isFilterMet(filter)) { - setEventObjectVal(field, value); - } else - log.info(FILTER_NOT_MET); - } + public void addAttribute(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + final String value = evaluate(jsonObject.getString(VALUE)); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + final String fieldType = jsonObject.optString("fieldType", "string").toLowerCase(); + + if (filter == null || isFilterMet(filter)) { + setEventObjectVal(field, value, fieldType); + } else + log.info(FILTER_NOT_MET); + } - public void removeAttribute(JSONObject jsonObject) { - - final String field = jsonObject.getString(FIELD); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - - if (filter == null || isFilterMet(filter)) { - removeEventKey(field); - } else - log.info(FILTER_NOT_MET); - } - - - private void renameArrayInArray(JSONObject jsonObject) // map - { - log.info("renameArrayInArray"); - final String field = jsonObject.getString(FIELD); - final String oldField = jsonObject.getString(OLD_FIELD); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - - if (filter == null || isFilterMet(filter)) { - - final String[] fsplit = field.split(REGEX, field.length()); - final String[] oldfsplit = oldField.split(REGEX, oldField.length()); - - final String oldValue = getEventObjectVal(oldfsplit[0]).toString(); - if (!oldValue.equals(OBJECT_NOT_FOUND)) { - final String oldArrayName = oldfsplit[1].substring(1); - final String newArrayName = fsplit[1].substring(1); - final String value = oldValue.replaceAll(oldArrayName, newArrayName); - - log.info("oldValue ==" + oldValue); - log.info("value ==" + value); - JSONArray ja = new JSONArray(value); - removeEventKey(oldfsplit[0]); - setEventObjectVal(fsplit[0], ja); - } - } else - log.info(FILTER_NOT_MET); - } - - - public void map(JSONObject jsonObject) { - - final String field = jsonObject.getString(FIELD); - if (field.contains("[]")) { - if (field.matches(".*\\[\\]\\..*\\[\\]")) - renameArrayInArray(jsonObject); - else - mapToJArray(jsonObject); - } else - mapAttribute(jsonObject); - } - - private String performOperation(String operation, String value) { - log.info("performOperation"); - if ("convertMBtoKB".equals(operation)) { - float kbValue = Float.parseFloat(value) * 1024; - value = String.valueOf(kbValue); - } - return value; - } - - - public void mapAttribute(JSONObject jsonObject) { - - final String field = jsonObject.getString(FIELD); - final String oldField = jsonObject.getString(OLD_FIELD); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - final String operation = jsonObject.optString("operation"); - String value; - if (filter == null || isFilterMet(filter)) { - - value = getEventObjectVal(oldField).toString(); - if (!value.equals(OBJECT_NOT_FOUND)) { - if (operation != null && !operation.isEmpty()) - value = performOperation(operation, value); - - setEventObjectVal(field, value); - - removeEventKey(oldField); - } - } else - log.info(FILTER_NOT_MET); - } - - - private void mapToJArray(JSONObject jsonObject) { - log.info("mapToJArray"); - String field = jsonObject.getString(FIELD); - String oldField = jsonObject.getString(OLD_FIELD); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - final JSONObject attrMap = jsonObject.optJSONObject("attrMap"); - oldField = oldField.replaceAll(REGEX, ""); - field = field.replaceAll(REGEX, ""); - - if (filter == null || isFilterMet(filter)) { - - String value = getEventObjectVal(oldField).toString(); - if (!value.equals(OBJECT_NOT_FOUND)) { - log.info("old value ==" + value); - // update old value based on attrMap - if (attrMap != null) { - // loop thru attrMap and update attribute name to new name - for (String key : attrMap.keySet()) { - value = value.replaceAll(key, attrMap.getString(key)); - } - } - - log.info("new value ==" + value); - char c = value.charAt(0); - if (c != '[') { - // oldfield is JsonObject - JSONObject valueJO = new JSONObject(value); - // if the array already exists - String existingValue = getEventObjectVal(field).toString(); - if (!existingValue.equals(OBJECT_NOT_FOUND)) { - JSONArray ja = new JSONArray(existingValue); - JSONObject jo = ja.optJSONObject(0); - if (jo != null) { - for (String key : valueJO.keySet()) { - jo.put(key, valueJO.get(key)); - - } - ja.put(0, jo); - - setEventObjectVal(field, ja); - } - } else // if new array - setEventObjectVal(field + "[0]", new JSONObject(value), "JArray"); - } else // oldfield is jsonArray - setEventObjectVal(field, new JSONArray(value)); - - removeEventKey(oldField); - } - } else - log.info(FILTER_NOT_MET); - } - - /** - * example - { "functionName": "concatenateValue", "args":{ "filter": - * {"event.commonEventHeader.event":"heartbeat"}, - * FIELD:"event.commonEventHeader.eventName", "concatenate": - * ["event.commonEventHeader.domain","event.commonEventHeader.eventType","event.commonEventHeader.alarmCondition"], - * "delimiter":"_" } } - **/ - public void concatenateValue(JSONObject jsonObject) { - - final String field = jsonObject.getString(FIELD); - final String delimiter = jsonObject.getString("delimiter"); - final JSONArray values = jsonObject.getJSONArray("concatenate"); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - if (filter == null || isFilterMet(filter)) { - StringBuilder value = new StringBuilder(); - for (int i = 0; i < values.length(); i++) { - - String tempVal = evaluate(values.getString(i)); - if (!tempVal.equals(OBJECT_NOT_FOUND)) { - if (i == 0) - value.append(tempVal); - else - value.append(delimiter).append(tempVal); - } - } - - setEventObjectVal(field, value.toString()); - } else - log.info(FILTER_NOT_MET); - } - - public void subtractValue(JSONObject jsonObject) { - - final String field = jsonObject.getString(FIELD); - final JSONArray values = jsonObject.getJSONArray("subtract"); - final JSONObject filter = jsonObject.optJSONObject(FILTER); - if (filter == null || isFilterMet(filter)) { - float value = 0; - for (int i = 0; i < values.length(); i++) { - log.info(values.getString(i)); - String tempVal = evaluate(values.getString(i)); - log.info("tempVal==" + tempVal); - if (!tempVal.equals(OBJECT_NOT_FOUND)) { - if (i == 0) - value = value + Float.valueOf(tempVal); - else - value = value - Float.valueOf(tempVal); - } - } - log.info("value ==" + value); - setEventObjectVal(field, value, "number"); - } else - log.info(FILTER_NOT_MET); - } - - - private void removeEventKey(String field) { - String[] keySet = field.split("\\.", field.length()); - JSONObject keySeries = event; - for (int i = 0; i < (keySet.length - 1); i++) { - - keySeries = keySeries.getJSONObject(keySet[i]); - } - - keySeries.remove(keySet[keySet.length - 1]); - } - - - private boolean checkFilter(JSONObject jo, String key, String logicKey) { - String filterValue = jo.getString(key); - if (filterValue.contains(":")) { - String[] splitVal = filterValue.split(":"); - if ("matches".equals(splitVal[0])) { - if ("not".equals(logicKey)) { - if (getEventObjectVal(key).toString().matches(splitVal[1])) { - log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); - return false; - } - } else { - if (!(getEventObjectVal(key).toString().matches(splitVal[1]))) { - log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); - return false; - } - } - - } - if ("contains".equals(splitVal[0])) { - if ("not".equals(logicKey)) { - if (getEventObjectVal(key).toString().contains(splitVal[1])) { - log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); - return false; - } - } else { - if (!(getEventObjectVal(key).toString().contains(splitVal[1]))) { - log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); - return false; - } - } - - } - } else { - if ("not".equals(logicKey)) { - if (getEventObjectVal(key).toString().equals(filterValue)) { - log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); - return false; - } - } else { - if (!(getEventObjectVal(key).toString().equals(filterValue))) { - log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); - return false; - } - } - } - return true; - } - - - public boolean isFilterMet(JSONObject jo) { - for (String key : jo.keySet()) { - if ("not".equals(key)) { - JSONObject njo = jo.getJSONObject(key); - for (String njoKey : njo.keySet()) { - if (!checkFilter(njo, njoKey, key)) - return false; - } - } else { - if (!checkFilter(jo, key, key)) - return false; - } - } - return true; - } - - /** - * returns a string or JSONObject or JSONArray - **/ - public Object getEventObjectVal(String keySeriesStr) { - keySeriesStr = keySeriesStr.replaceAll("\\[", "."); - keySeriesStr = keySeriesStr.replaceAll("\\]", "."); - if (keySeriesStr.contains("..")) { - keySeriesStr = keySeriesStr.replaceAll("\\.\\.", "."); - } - - if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() - 1) - keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1); - String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length()); - Object keySeriesObj = event; - for (String aKeySet : keySet) { - if (keySeriesObj != null) { - if (keySeriesObj instanceof String) { - - log.info("STRING==" + keySeriesObj); - } else if (keySeriesObj instanceof JSONArray) { - keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(aKeySet)); - - } else if (keySeriesObj instanceof JSONObject) { - keySeriesObj = ((JSONObject) keySeriesObj).opt(aKeySet); - - } else { - log.info("unknown object==" + keySeriesObj); - } - } - } - - if (keySeriesObj == null) - return OBJECT_NOT_FOUND; - return keySeriesObj; - } - - public void setEventObjectVal(String keySeriesStr, Object value) { - setEventObjectVal(keySeriesStr, value, "string"); - } - - /** - * returns a string or JSONObject or JSONArray - **/ - public void setEventObjectVal(String keySeriesStr, Object value, String fieldType) { - keySeriesStr = keySeriesStr.replaceAll("\\[", "."); - keySeriesStr = keySeriesStr.replaceAll("\\]", "."); - if (keySeriesStr.contains("..")) { - keySeriesStr = keySeriesStr.replaceAll("\\.\\.", "."); - } - log.info("fieldType==" + fieldType); - - if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() - 1) - keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1); - String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length()); - Object keySeriesObj = event; - for (int i = 0; i < (keySet.length - 1); i++) { - - if (keySeriesObj instanceof JSONArray) { - - if (((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])) == null) // if - // the - // object - // is - // not - // there - // then - // add - // it - { - log.info("Object is null, must add it"); - if (keySet[i + 1].matches("[0-9]*")) // if index then array - ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONArray()); - else - ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONObject()); - } - keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])); - - } else if (keySeriesObj instanceof JSONObject) { - if (((JSONObject) keySeriesObj).opt(keySet[i]) == null) // if - // the - // object - // is - // not - // there - // then - // add - // it - { - if (keySet[i + 1].matches("[0-9]*")) // if index then array - ((JSONObject) keySeriesObj).put(keySet[i], new JSONArray()); - else - ((JSONObject) keySeriesObj).put(keySet[i], new JSONObject()); - log.info("Object is null, must add it"); - } - keySeriesObj = ((JSONObject) keySeriesObj).opt(keySet[i]); - } else { - log.info("unknown object==" + keySeriesObj); - } - } - if ("number".equals(fieldType)) { - DecimalFormat df = new DecimalFormat("#.0"); - if (value instanceof String) - ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], - Float.valueOf(df.format(Float.valueOf((String) value)))); - else - ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], Float.valueOf(df.format(value))); - } else if ("integer".equals(fieldType) && value instanceof String) - ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], Integer.valueOf((String) value)); - else if ("JArray".equals(fieldType)) - ((JSONArray) keySeriesObj).put(value); - else - ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], value); - - } + public void updateAttribute(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + final String value = evaluate(jsonObject.getString(VALUE)); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + if (filter == null || isFilterMet(filter)) { + setEventObjectVal(field, value); + } else + log.info(FILTER_NOT_MET); + } + + + public void removeAttribute(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + + if (filter == null || isFilterMet(filter)) { + removeEventKey(field); + } else + log.info(FILTER_NOT_MET); + } + + + private void renameArrayInArray(JSONObject jsonObject) // map + { + log.info("renameArrayInArray"); + final String field = jsonObject.getString(FIELD); + final String oldField = jsonObject.getString(OLD_FIELD); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + + if (filter == null || isFilterMet(filter)) { + + final String[] fsplit = field.split(REGEX, field.length()); + final String[] oldfsplit = oldField.split(REGEX, oldField.length()); + + final String oldValue = getEventObjectVal(oldfsplit[0]).toString(); + if (!oldValue.equals(OBJECT_NOT_FOUND)) { + final String oldArrayName = oldfsplit[1].substring(1); + final String newArrayName = fsplit[1].substring(1); + final String value = oldValue.replaceAll(oldArrayName, newArrayName); + + log.info("oldValue ==" + oldValue); + log.info("value ==" + value); + JSONArray ja = new JSONArray(value); + removeEventKey(oldfsplit[0]); + setEventObjectVal(fsplit[0], ja); + } + } else + log.info(FILTER_NOT_MET); + } + + + public void map(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + if (field.contains("[]")) { + if (field.matches(".*\\[\\]\\..*\\[\\]")) + renameArrayInArray(jsonObject); + else + mapToJArray(jsonObject); + } else + mapAttribute(jsonObject); + } + + private String performOperation(String operation, String value) { + log.info("performOperation"); + if ("convertMBtoKB".equals(operation)) { + float kbValue = Float.parseFloat(value) * 1024; + value = String.valueOf(kbValue); + } + return value; + } + + + public void mapAttribute(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + final String oldField = jsonObject.getString(OLD_FIELD); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + final String operation = jsonObject.optString("operation"); + String value; + if (filter == null || isFilterMet(filter)) { + + value = getEventObjectVal(oldField).toString(); + if (!value.equals(OBJECT_NOT_FOUND)) { + if (operation != null && !operation.isEmpty()) + value = performOperation(operation, value); + + setEventObjectVal(field, value); + + removeEventKey(oldField); + } + } else + log.info(FILTER_NOT_MET); + } + + + private void mapToJArray(JSONObject jsonObject) { + log.info("mapToJArray"); + String field = jsonObject.getString(FIELD); + String oldField = jsonObject.getString(OLD_FIELD); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + final JSONObject attrMap = jsonObject.optJSONObject("attrMap"); + oldField = oldField.replaceAll(REGEX, ""); + field = field.replaceAll(REGEX, ""); + + if (filter == null || isFilterMet(filter)) { + + String value = getEventObjectVal(oldField).toString(); + if (!value.equals(OBJECT_NOT_FOUND)) { + log.info("old value ==" + value); + // update old value based on attrMap + if (attrMap != null) { + // loop thru attrMap and update attribute name to new name + for (String key : attrMap.keySet()) { + value = value.replaceAll(key, attrMap.getString(key)); + } + } + + log.info("new value ==" + value); + char c = value.charAt(0); + if (c != '[') { + // oldfield is JsonObject + JSONObject valueJO = new JSONObject(value); + // if the array already exists + String existingValue = getEventObjectVal(field).toString(); + if (!existingValue.equals(OBJECT_NOT_FOUND)) { + JSONArray ja = new JSONArray(existingValue); + JSONObject jo = ja.optJSONObject(0); + if (jo != null) { + for (String key : valueJO.keySet()) { + jo.put(key, valueJO.get(key)); + + } + ja.put(0, jo); + + setEventObjectVal(field, ja); + } + } else // if new array + setEventObjectVal(field + "[0]", new JSONObject(value), "JArray"); + } else // oldfield is jsonArray + setEventObjectVal(field, new JSONArray(value)); + + removeEventKey(oldField); + } + } else + log.info(FILTER_NOT_MET); + } + + /** + * example - { "functionName": "concatenateValue", "args":{ "filter": + * {"event.commonEventHeader.event":"heartbeat"}, + * FIELD:"event.commonEventHeader.eventName", "concatenate": + * ["event.commonEventHeader.domain","event.commonEventHeader.eventType","event.commonEventHeader.alarmCondition"], + * "delimiter":"_" } } + **/ + public void concatenateValue(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + final String delimiter = jsonObject.getString("delimiter"); + final JSONArray values = jsonObject.getJSONArray("concatenate"); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + if (filter == null || isFilterMet(filter)) { + StringBuilder value = new StringBuilder(); + for (int i = 0; i < values.length(); i++) { + + String tempVal = evaluate(values.getString(i)); + if (!tempVal.equals(OBJECT_NOT_FOUND)) { + if (i == 0) + value.append(tempVal); + else + value.append(delimiter).append(tempVal); + } + } + + setEventObjectVal(field, value.toString()); + } else + log.info(FILTER_NOT_MET); + } + + public void subtractValue(JSONObject jsonObject) { + + final String field = jsonObject.getString(FIELD); + final JSONArray values = jsonObject.getJSONArray("subtract"); + final JSONObject filter = jsonObject.optJSONObject(FILTER); + if (filter == null || isFilterMet(filter)) { + float value = 0; + for (int i = 0; i < values.length(); i++) { + log.info(values.getString(i)); + String tempVal = evaluate(values.getString(i)); + log.info("tempVal==" + tempVal); + if (!tempVal.equals(OBJECT_NOT_FOUND)) { + if (i == 0) + value = value + Float.valueOf(tempVal); + else + value = value - Float.valueOf(tempVal); + } + } + log.info("value ==" + value); + setEventObjectVal(field, value, "number"); + } else + log.info(FILTER_NOT_MET); + } + + + private void removeEventKey(String field) { + String[] keySet = field.split("\\.", field.length()); + JSONObject keySeries = event; + for (int i = 0; i < (keySet.length - 1); i++) { + + keySeries = keySeries.getJSONObject(keySet[i]); + } + + keySeries.remove(keySet[keySet.length - 1]); + } + + + private boolean checkFilter(JSONObject jo, String key, String logicKey) { + String filterValue = jo.getString(key); + if (filterValue.contains(":")) { + String[] splitVal = filterValue.split(":"); + if ("matches".equals(splitVal[0])) { + if ("not".equals(logicKey)) { + if (getEventObjectVal(key).toString().matches(splitVal[1])) { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); + return false; + } + } else { + if (!(getEventObjectVal(key).toString().matches(splitVal[1]))) { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); + return false; + } + } + + } + if ("contains".equals(splitVal[0])) { + if ("not".equals(logicKey)) { + if (getEventObjectVal(key).toString().contains(splitVal[1])) { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); + return false; + } + } else { + if (!(getEventObjectVal(key).toString().contains(splitVal[1]))) { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); + return false; + } + } + + } + } else { + if ("not".equals(logicKey)) { + if (getEventObjectVal(key).toString().equals(filterValue)) { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); + return false; + } + } else { + if (!(getEventObjectVal(key).toString().equals(filterValue))) { + log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE); + return false; + } + } + } + return true; + } + + + public boolean isFilterMet(JSONObject jo) { + for (String key : jo.keySet()) { + if ("not".equals(key)) { + JSONObject njo = jo.getJSONObject(key); + for (String njoKey : njo.keySet()) { + if (!checkFilter(njo, njoKey, key)) + return false; + } + } else { + if (!checkFilter(jo, key, key)) + return false; + } + } + return true; + } + + /** + * returns a string or JSONObject or JSONArray + **/ + public Object getEventObjectVal(String keySeriesStr) { + keySeriesStr = keySeriesStr.replaceAll("\\[", "."); + keySeriesStr = keySeriesStr.replaceAll("\\]", "."); + if (keySeriesStr.contains("..")) { + keySeriesStr = keySeriesStr.replaceAll("\\.\\.", "."); + } + + if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() - 1) + keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1); + String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length()); + Object keySeriesObj = event; + for (String aKeySet : keySet) { + if (keySeriesObj != null) { + if (keySeriesObj instanceof String) { + + log.info("STRING==" + keySeriesObj); + } else if (keySeriesObj instanceof JSONArray) { + keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(aKeySet)); + + } else if (keySeriesObj instanceof JSONObject) { + keySeriesObj = ((JSONObject) keySeriesObj).opt(aKeySet); + + } else { + log.info("unknown object==" + keySeriesObj); + } + } + } + + if (keySeriesObj == null) + return OBJECT_NOT_FOUND; + return keySeriesObj; + } + + public void setEventObjectVal(String keySeriesStr, Object value) { + setEventObjectVal(keySeriesStr, value, "string"); + } + + /** + * returns a string or JSONObject or JSONArray + **/ + public void setEventObjectVal(String keySeriesStr, Object value, String fieldType) { + keySeriesStr = keySeriesStr.replaceAll("\\[", "."); + keySeriesStr = keySeriesStr.replaceAll("\\]", "."); + if (keySeriesStr.contains("..")) { + keySeriesStr = keySeriesStr.replaceAll("\\.\\.", "."); + } + log.info("fieldType==" + fieldType); + + if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() - 1) + keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1); + String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length()); + Object keySeriesObj = event; + for (int i = 0; i < (keySet.length - 1); i++) { + + if (keySeriesObj instanceof JSONArray) { + + if (((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])) == null) // if + // the + // object + // is + // not + // there + // then + // add + // it + { + log.info("Object is null, must add it"); + if (keySet[i + 1].matches("[0-9]*")) // if index then array + ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONArray()); + else + ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONObject()); + } + keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])); + + } else if (keySeriesObj instanceof JSONObject) { + if (((JSONObject) keySeriesObj).opt(keySet[i]) == null) // if + // the + // object + // is + // not + // there + // then + // add + // it + { + if (keySet[i + 1].matches("[0-9]*")) // if index then array + ((JSONObject) keySeriesObj).put(keySet[i], new JSONArray()); + else + ((JSONObject) keySeriesObj).put(keySet[i], new JSONObject()); + log.info("Object is null, must add it"); + } + keySeriesObj = ((JSONObject) keySeriesObj).opt(keySet[i]); + } else { + log.info("unknown object==" + keySeriesObj); + } + } + if ("number".equals(fieldType)) { + DecimalFormat df = new DecimalFormat("#.0"); + if (value instanceof String) + ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], + Float.valueOf(df.format(Float.valueOf((String) value)))); + else + ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], Float.valueOf(df.format(value))); + } else if ("integer".equals(fieldType) && value instanceof String) + ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], Integer.valueOf((String) value)); + else if ("JArray".equals(fieldType)) + ((JSONArray) keySeriesObj).put(value); + else + ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], value); + + } } diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java index a57ea3f0..7d27399d 100644 --- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java +++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java @@ -25,7 +25,10 @@ import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
+import io.vavr.collection.Map;
import org.json.JSONObject;
+import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.VesApplication;
import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,37 +38,38 @@ import java.io.IOException; import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-class EventProcessor implements Runnable {
+public class EventProcessor implements Runnable {
+ static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {
+ }.getType();
private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
private static final String EVENT_LITERAL = "event";
private static final String COMMON_EVENT_HEADER = "commonEventHeader";
- static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType();
private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
- static Map<String, String[]> streamidHash = new HashMap<>();
public JSONObject event;
private EventPublisher eventPublisher;
+ private Map<String, String[]> streamidHash;
+ private ApplicationSettings properties;
- public EventProcessor(EventPublisher eventPublisher) {
+
+ public EventProcessor(EventPublisher eventPublisher, ApplicationSettings properties) {
this.eventPublisher = eventPublisher;
- streamidHash = CommonStartup.streamID.toJavaMap();
+ this.properties = properties;
+ this.streamidHash = properties.dMaaPStreamsMapping();
}
@Override
public void run() {
try {
while (true) {
- event = CommonStartup.fProcessingInputQueue.take();
+ event = VesApplication.fProcessingInputQueue.take();
// As long as the producer is running we remove elements from
// the queue.
- log.info("QueueSize:" + CommonStartup.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
+ log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
String uuid = event.get("VESuniqueId").toString();
LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
@@ -73,14 +77,12 @@ class EventProcessor implements Runnable { String domain = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain");
log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + domain);
- String[] streamIdList = streamidHash.get(domain);
- log.debug("streamIdList:" + Arrays.toString(streamIdList));
-
- if (streamIdList.length == 0) {
- log.error("No StreamID defined for publish - Message dropped" + event);
- } else {
- sendEventsToStreams(streamIdList);
- }
+ streamidHash.get(domain)
+ .onEmpty(() -> {
+ log.error("No StreamID defined for publish - Message dropped" + event);
+ }).forEach(streamIds -> {
+ sendEventsToStreams(streamIds);
+ });
log.debug("Message published" + event);
}
} catch (InterruptedException e) {
@@ -93,7 +95,7 @@ class EventProcessor implements Runnable { // Set collector timestamp in event payload before publish
addCurrentTimeToEvent(event);
- if (CommonStartup.eventTransformFlag) {
+ if (properties.eventTransformingEnabled()) {
// read the mapping json file
try (FileReader fr = new FileReader("./etc/eventTransform.json")) {
log.info("parse eventTransform.json");
@@ -168,5 +170,4 @@ class EventProcessor implements Runnable { method.invoke(configProcessors, parameter);
}
}
-}
-
+}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/commonFunction/VESLogger.java b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java index a967327e..2a392e81 100644 --- a/src/main/java/org/onap/dcae/commonFunction/VESLogger.java +++ b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java @@ -8,9 +8,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -35,128 +35,126 @@ import java.util.UUID; public class VESLogger { - public static final String VES_AGENT = "VES_AGENT"; - public static final String REQUEST_ID = "requestId"; - private static final String IP_ADDRESS = "127.0.0.1"; - private static final String HOST_NAME = "localhost"; - - public static Logger auditLog; - public static Logger metricsLog; - public static Logger errorLog; - public static Logger debugLog; - - // Common LoggingContext - private static LoggingContext commonLC; - // Thread-specific LoggingContext - private static LoggingContext threadLC; - public LoggingContext lc; - - /** - * Returns the common LoggingContext instance that is the base context for - * all subsequent instances. - * - * @return the common LoggingContext - */ - public static LoggingContext getCommonLoggingContext() { - if (commonLC == null) { - commonLC = new LoggingContextFactory.Builder().build(); - final UUID uuid = UUID.randomUUID(); - - commonLC.put(REQUEST_ID, uuid.toString()); - } - return commonLC; - } - - /** - * Get a logging context for the current thread that's based on the common - * logging context. Populate the context with context-specific values. - * - * @param aUuid - * uuid for request id - * @return a LoggingContext for the current thread - */ - public static LoggingContext getLoggingContextForThread(UUID aUuid) { - // note that this operation requires everything from the common context - // to be (re)copied into the target context. That seems slow, but it - // actually - // helps prevent the thread from overwriting supposedly common data. It - // also - // should be fairly quick compared with the overhead of handling the - // actual - // service call. - - threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build(); - // Establish the request-specific UUID, as long as we are here... - threadLC.put(REQUEST_ID, aUuid.toString()); - threadLC.put(EcompFields.kEndTimestamp, SaClock.now()); - - return threadLC; - } - - /** - * Get a logging context for the current thread that's based on the common - * logging context. Populate the context with context-specific values. - * - * @param aUuid - * uuid for request id - * @return a LoggingContext for the current thread - */ - public static LoggingContext getLoggingContextForThread(String aUuid) { - // note that this operation requires everything from the common context - // to be (re)copied into the target context. That seems slow, but it - // actually - // helps prevent the thread from overwriting supposedly common data. It - // also - // should be fairly quick compared with the overhead of handling the - // actual - // service call. - - threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build(); - // Establish the request-specific UUID, as long as we are here... - threadLC.put(REQUEST_ID, aUuid); - threadLC.put("statusCode", "COMPLETE"); - threadLC.put(EcompFields.kEndTimestamp, SaClock.now()); - return threadLC; - } - - public static void setUpEcompLogging() { - - // Create ECOMP Logger instances - auditLog = LoggerFactory.getLogger("com.att.ecomp.audit"); - metricsLog = LoggerFactory.getLogger("com.att.ecomp.metrics"); - debugLog = LoggerFactory.getLogger("com.att.ecomp.debug"); - errorLog = LoggerFactory.getLogger("com.att.ecomp.error"); - - final LoggingContext lc = getCommonLoggingContext(); - - String ipAddr = IP_ADDRESS; - String hostname = HOST_NAME; - try { - final InetAddress ip = InetAddress.getLocalHost(); - hostname = ip.getCanonicalHostName(); - ipAddr = ip.getHostAddress(); - } catch (UnknownHostException x) { - Log.debug(x.getMessage()); - } - - lc.put("serverName", hostname); - lc.put("serviceName", "VESCollecor"); - lc.put("statusCode", "RUNNING"); - lc.put("targetEntity", "NULL"); - lc.put("targetServiceName", "NULL"); - lc.put("server", hostname); - lc.put("serverIpAddress", ipAddr); - - // instance UUID is meaningless here, so we just create a new one each - // time the - // server starts. One could argue each new instantiation of the service - // should - // have a new instance ID. - lc.put("instanceUuid", ""); - lc.put("severity", ""); - lc.put(EcompFields.kEndTimestamp, SaClock.now()); - lc.put("EndTimestamp", SaClock.now()); - lc.put("partnerName", "NA"); - } + public static final String VES_AGENT = "VES_AGENT"; + public static final String REQUEST_ID = "requestId"; + private static final String IP_ADDRESS = "127.0.0.1"; + private static final String HOST_NAME = "localhost"; + + public static Logger auditLog; + public static Logger metricsLog; + public static Logger errorLog; + public static Logger debugLog; + + // Common LoggingContext + private static LoggingContext commonLC; + // Thread-specific LoggingContext + private static LoggingContext threadLC; + public LoggingContext lc; + + /** + * Returns the common LoggingContext instance that is the base context for + * all subsequent instances. + * + * @return the common LoggingContext + */ + public static LoggingContext getCommonLoggingContext() { + if (commonLC == null) { + commonLC = new LoggingContextFactory.Builder().build(); + final UUID uuid = UUID.randomUUID(); + + commonLC.put(REQUEST_ID, uuid.toString()); + } + return commonLC; + } + + /** + * Get a logging context for the current thread that's based on the common + * logging context. Populate the context with context-specific values. + * + * @param aUuid uuid for request id + * @return a LoggingContext for the current thread + */ + public static LoggingContext getLoggingContextForThread(UUID aUuid) { + // note that this operation requires everything from the common context + // to be (re)copied into the target context. That seems slow, but it + // actually + // helps prevent the thread from overwriting supposedly common data. It + // also + // should be fairly quick compared with the overhead of handling the + // actual + // service call. + + threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build(); + // Establish the request-specific UUID, as long as we are here... + threadLC.put(REQUEST_ID, aUuid.toString()); + threadLC.put(EcompFields.kEndTimestamp, SaClock.now()); + + return threadLC; + } + + /** + * Get a logging context for the current thread that's based on the common + * logging context. Populate the context with context-specific values. + * + * @param aUuid uuid for request id + * @return a LoggingContext for the current thread + */ + public static LoggingContext getLoggingContextForThread(String aUuid) { + // note that this operation requires everything from the common context + // to be (re)copied into the target context. That seems slow, but it + // actually + // helps prevent the thread from overwriting supposedly common data. It + // also + // should be fairly quick compared with the overhead of handling the + // actual + // service call. + + threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build(); + // Establish the request-specific UUID, as long as we are here... + threadLC.put(REQUEST_ID, aUuid); + threadLC.put("statusCode", "COMPLETE"); + threadLC.put(EcompFields.kEndTimestamp, SaClock.now()); + return threadLC; + } + + public static void setUpEcompLogging() { + + // Create ECOMP Logger instances + auditLog = LoggerFactory.getLogger("com.att.ecomp.audit"); + metricsLog = LoggerFactory.getLogger("com.att.ecomp.metrics"); + debugLog = LoggerFactory.getLogger("com.att.ecomp.debug"); + errorLog = LoggerFactory.getLogger("com.att.ecomp.error"); + + final LoggingContext lc = getCommonLoggingContext(); + + String ipAddr = IP_ADDRESS; + String hostname = HOST_NAME; + try { + final InetAddress ip = InetAddress.getLocalHost(); + hostname = ip.getCanonicalHostName(); + ipAddr = ip.getHostAddress(); + } catch (UnknownHostException x) { + Log.debug(x.getMessage()); + } + + lc.put("serverName", hostname); + lc.put("serviceName", "VESCollecor"); + lc.put("statusCode", "RUNNING"); + lc.put("targetEntity", "NULL"); + lc.put("targetServiceName", "NULL"); + lc.put("server", hostname); + lc.put("serverIpAddress", ipAddr); + + // instance UUID is meaningless here, so we just create a new one each + // time the + // server starts. One could argue each new instantiation of the service + // should + // have a new instance ID. + lc.put("instanceUuid", ""); + lc.put("severity", ""); + lc.put(EcompFields.kEndTimestamp, SaClock.now()); + lc.put("EndTimestamp", SaClock.now()); + lc.put("partnerName", "NA"); + } } diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java index 5865b12c..179e8826 100644 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java @@ -19,21 +19,19 @@ */ package org.onap.dcae.commonFunction.event.publishing; -import static io.vavr.API.List; -import static io.vavr.API.Try; -import static io.vavr.API.Tuple; -import static io.vavr.API.unchecked; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - import io.vavr.collection.List; import io.vavr.collection.Map; import io.vavr.control.Option; import io.vavr.control.Try; +import org.onap.dcae.commonFunction.AnyNode; + import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; -import org.onap.dcae.commonFunction.AnyNode; + +import static io.vavr.API.*; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) @@ -43,23 +41,23 @@ public final class DMaaPConfigurationParser { public static Try<Map<String, PublisherConfig>> parseToDomainMapping(Path configLocation) { return readFromFile(configLocation) - .flatMap(DMaaPConfigurationParser::toJSON) - .flatMap(DMaaPConfigurationParser::toConfigMap); + .flatMap(DMaaPConfigurationParser::toJSON) + .flatMap(DMaaPConfigurationParser::toConfigMap); } private static Try<String> readFromFile(Path configLocation) { return Try(() -> new String(Files.readAllBytes(configLocation))) - .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation))); + .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation))); } private static Try<AnyNode> toJSON(String config) { return Try(() -> AnyNode.fromString(config)) - .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config))); + .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config))); } private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) { return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config)) - .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config))); + .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config))); } private static boolean usesLegacyFormat(AnyNode dMaaPConfig) { @@ -68,40 +66,40 @@ public final class DMaaPConfigurationParser { private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) { return root.get("channels").toList().toMap( - channel -> channel.get("name").toString(), - channel -> { - String destinationsStr = channel.getAsOption("cambria.url") - .getOrElse(channel.getAsOption("cambria.hosts").get()) - .toString(); - String topic = channel.get("cambria.topic").toString(); - Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString); - Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString); - List<String> destinations = List(destinationsStr.split(",")); - return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); - }); + channel -> channel.get("name").toString(), + channel -> { + String destinationsStr = channel.getAsOption("cambria.url") + .getOrElse(channel.getAsOption("cambria.hosts").get()) + .toString(); + String topic = channel.get("cambria.topic").toString(); + Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString); + Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString); + List<String> destinations = List(destinationsStr.split(",")); + return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); + }); } private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) { return root.keys().toMap( - channelName -> channelName, - channelName -> { - AnyNode channelConfig = root.get(channelName); - Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString); - Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString); - URL topicURL = unchecked( - () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply(); - String[] pathSegments = topicURL.getPath().substring(1).split("/"); - String topic = pathSegments[1]; - String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost(); - List<String> destinations = List(destination); - return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); - }); + channelName -> channelName, + channelName -> { + AnyNode channelConfig = root.get(channelName); + Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString); + Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString); + URL topicURL = unchecked( + () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply(); + String[] pathSegments = topicURL.getPath().substring(1).split("/"); + String topic = pathSegments[1]; + String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost(); + List<String> destinations = List(destination); + return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); + }); } private static PublisherConfig buildBasedOnAuth(Option<String> maybeUser, Option<String> maybePassword, String topic, List<String> destinations) { return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password))) - .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2)) - .getOrElse(new PublisherConfig(destinations, topic)); + .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2)) + .getOrElse(new PublisherConfig(destinations, topic)); } } diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java index fd9b3ae1..a0ee3bfb 100644 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java @@ -21,20 +21,21 @@ package org.onap.dcae.commonFunction.event.publishing; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.att.nsa.clock.SaClock; import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; import io.vavr.control.Try; -import java.io.IOException; import org.json.JSONObject; import org.onap.dcae.commonFunction.VESLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; + /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ @@ -55,9 +56,9 @@ class DMaaPEventPublisher implements EventPublisher { public void sendEvent(JSONObject event, String domain) { clearVesUniqueIdFromEvent(event); publishersCache.getPublisher(domain) - .onEmpty(() -> - log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event))) - .forEach(publisher -> sendEvent(event, domain, publisher)); + .onEmpty(() -> + log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event))) + .forEach(publisher -> sendEvent(event, domain, publisher)); } @Override @@ -67,11 +68,11 @@ class DMaaPEventPublisher implements EventPublisher { private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) { Try.run(() -> uncheckedSendEvent(event, domain, publisher)) - .onFailure(exc -> closePublisher(event, domain, exc)); + .onFailure(exc -> closePublisher(event, domain, exc)); } private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) - throws IOException { + throws IOException { int pendingMsgs = publisher.send("MyPartitionKey", event.toString()); if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { log.info("Pending messages count: " + pendingMsgs); @@ -83,7 +84,7 @@ class DMaaPEventPublisher implements EventPublisher { private void closePublisher(JSONObject event, String domain, Throwable e) { log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", - event, domain), e); + event, domain), e); publishersCache.closePublisherFor(domain); } diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java index a7865a45..489fcbf0 100644 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java @@ -19,15 +19,15 @@ */ package org.onap.dcae.commonFunction.event.publishing; -import static io.vavr.API.Try; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.att.nsa.cambria.client.CambriaClientBuilders; import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; import io.vavr.control.Try; +import static io.vavr.API.Try; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; + /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ @@ -36,7 +36,7 @@ final class DMaaPPublishersBuilder { @SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do") static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) { return Try(() -> builder(config).build()) - .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config))); + .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config))); } private static PublisherBuilder builder(PublisherConfig config) { @@ -49,14 +49,14 @@ final class DMaaPPublishersBuilder { private static PublisherBuilder authenticatedBuilder(PublisherConfig config) { return unAuthenticatedBuilder(config) - .usingHttps() - .authenticatedByHttp(config.userName().get(), config.password().get()); + .usingHttps() + .authenticatedByHttp(config.userName().get(), config.password().get()); } private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) { return new CambriaClientBuilders.PublisherBuilder() - .usingHosts(config.destinations().mkString(",")) - .onTopic(config.topic()) - .logSendFailuresAfter(5); + .usingHosts(config.destinations().mkString(",")) + .onTopic(config.topic()) + .logSendFailuresAfter(5); } } diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java index 102d2774..4cdf92da 100644 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java @@ -20,24 +20,20 @@ */ package org.onap.dcae.commonFunction.event.publishing; -import static io.vavr.API.Option; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; +import com.google.common.cache.*; import io.vavr.collection.Map; import io.vavr.control.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nonnull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static io.vavr.API.Option; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) @@ -51,8 +47,8 @@ class DMaaPPublishersCache { DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) { this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); this.publishersCache = CacheBuilder.newBuilder() - .removalListener(new OnPublisherRemovalListener()) - .build(new CambriaPublishersCacheLoader()); + .removalListener(new OnPublisherRemovalListener()) + .build(new CambriaPublishersCacheLoader()); } DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader, @@ -60,8 +56,8 @@ class DMaaPPublishersCache { Map<String, PublisherConfig> dMaaPConfiguration) { this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); this.publishersCache = CacheBuilder.newBuilder() - .removalListener(onPublisherRemovalListener) - .build(dMaaPPublishersCacheLoader); + .removalListener(onPublisherRemovalListener) + .build(dMaaPPublishersCacheLoader); } Option<CambriaBatchingPublisher> getPublisher(String streamID) { @@ -80,9 +76,9 @@ class DMaaPPublishersCache { synchronized void reconfigure(Map<String, PublisherConfig> newConfig) { Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get(); Map<String, PublisherConfig> removedConfigurations = currentConfig - .filterKeys(domain -> !newConfig.containsKey(domain)); + .filterKeys(domain -> !newConfig.containsKey(domain)); Map<String, PublisherConfig> changedConfigurations = newConfig - .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); + .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); dMaaPConfiguration.set(newConfig); removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1)); } @@ -99,7 +95,7 @@ class DMaaPPublishersCache { java.util.List<?> stuck = publisher.close(timeout, unit); if (!stuck.isEmpty()) { log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', " - + "%s messages were dropped", stuck.size(), timeout, unit)); + + "%s messages were dropped", stuck.size(), timeout, unit)); } } catch (InterruptedException | IOException e) { log.error("Could not close Cambria publisher, some messages might have been dropped", e); @@ -113,11 +109,11 @@ class DMaaPPublishersCache { @Override public CambriaBatchingPublisher load(@Nonnull String domain) { return dMaaPConfiguration.get() - .get(domain) - .toTry(() -> new RuntimeException( - f("DMaaP configuration contains no configuration for domain: '%s'", domain))) - .flatMap(DMaaPPublishersBuilder::buildPublisher) - .get(); + .get(domain) + .toTry(() -> new RuntimeException( + f("DMaaP configuration contains no configuration for domain: '%s'", domain))) + .flatMap(DMaaPPublishersBuilder::buildPublisher) + .get(); } } diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java index 4a056778..f1cbb8e5 100644 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java @@ -21,6 +21,7 @@ package org.onap.dcae.commonFunction.event.publishing; import io.vavr.collection.List; import io.vavr.control.Option; + import java.util.Objects; /** @@ -76,9 +77,9 @@ public final class PublisherConfig { } PublisherConfig that = (PublisherConfig) o; return Objects.equals(destinations, that.destinations) && - Objects.equals(topic, that.topic) && - Objects.equals(userName, that.userName) && - Objects.equals(password, that.password); + Objects.equals(topic, that.topic) && + Objects.equals(userName, that.userName) && + Objects.equals(password, that.password); } @Override @@ -89,10 +90,10 @@ public final class PublisherConfig { @Override public String toString() { return "PublisherConfig{" + - "destinations=" + destinations + - ", topic='" + topic + '\'' + - ", userName='" + userName + '\'' + - ", password='" + password + '\'' + - '}'; + "destinations=" + destinations + + ", topic='" + topic + '\'' + + ", userName='" + userName + '\'' + + ", password='" + password + '\'' + + '}'; } } diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java index 9bf3ef8c..78f34ff4 100644 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java @@ -19,11 +19,11 @@ */ package org.onap.dcae.commonFunction.event.publishing; -import static io.vavr.API.$; - import io.vavr.API; import io.vavr.API.Match.Case; +import static io.vavr.API.$; + /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ diff --git a/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java b/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java index 99e269c1..ed42a5a4 100644 --- a/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java +++ b/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,171 +20,167 @@ package org.onap.dcae.controller;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.json.JSONArray;
import org.json.JSONObject;
import org.json.JSONTokener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
+import java.io.*;
import java.util.Map;
public class FetchDynamicConfig {
- private static final Logger log = LoggerFactory.getLogger(FetchDynamicConfig.class);
-
- public static String configFile = "/opt/app/KV-Configuration.json";
- private static String url;
- public static String retString;
- public static String retCBSString;
- private static Map<String, String> env;
+ private static final Logger log = LoggerFactory.getLogger(FetchDynamicConfig.class);
+
+ public static String configFile = "/opt/app/KV-Configuration.json";
+ public static String retString;
+ public static String retCBSString;
+ private static String url;
+ private static Map<String, String> env;
+
+ public FetchDynamicConfig() {
+ }
+
+ public static void main(String[] args) {
+ Boolean areEqual;
+ // Call consul api and identify the CBS Service address and port
+ getconsul();
+ // Construct and invoke CBS API to get application Configuration
+ getCBS();
+ // Verify if data has changed
+ areEqual = verifyConfigChange();
+ // If new config then write data returned into configFile for
+ // LoadDynamicConfig process
+ if (!areEqual) {
+ FetchDynamicConfig fc = new FetchDynamicConfig();
+ fc.writefile(retCBSString);
+ } else {
+ log.info("New config pull results identical - " + configFile + " NOT refreshed");
+ }
+ }
+
+ private static void getconsul() {
+
+ env = System.getenv();
+ for (Map.Entry<String, String> entry : env.entrySet()) {
+ log.info(entry.getKey() + ":" + entry.getValue());
+ }
+
+ if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")) {
+ // && env.containsKey("HOSTNAME")) {
+ log.info(">>>Dynamic configuration to be fetched from ConfigBindingService");
+ url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + env.get("CONFIG_BINDING_SERVICE");
+
+ retString = executecurl(url);
+
+ } else {
+ log.info(">>>Static configuration to be used");
+ }
+
+ }
+
+ public static boolean verifyConfigChange() {
+
+ boolean areEqual = false;
+ // Read current data
+ try {
+ File f = new File(configFile);
+ if (f.exists() && !f.isDirectory()) {
+
+ String jsonData = LoadDynamicConfig.readFile(configFile);
+ JSONObject jsonObject = new JSONObject(jsonData);
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ JsonNode tree1 = mapper.readTree(jsonObject.toString());
+ JsonNode tree2 = mapper.readTree(retCBSString);
+ areEqual = tree1.equals(tree2);
+ log.info("Comparison value:" + areEqual);
+ } else {
+ log.info("First time config file read: " + configFile);
+ }
+
+ } catch (IOException e) {
+ log.error("Comparison with new fetched data failed" + e.getMessage());
+
+ }
+
+ return areEqual;
+
+ }
+
+ public static void getCBS() {
+
+ env = System.getenv();
+ // consul return as array
+ JSONTokener temp = new JSONTokener(retString);
+ JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0);
+
+ String urlPart1 = null;
+ if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) {
+ urlPart1 = cbsjobj.getString("ServiceAddress") + ":" + cbsjobj.getInt("ServicePort");
+ }
+
+ log.info("CONFIG_BINDING_SERVICE DNS RESOLVED:" + urlPart1);
+
+ if (env.containsKey("HOSTNAME")) {
+ url = urlPart1 + "/service_component/" + env.get("HOSTNAME");
+ retCBSString = executecurl(url);
+ } else if (env.containsKey("SERVICE_NAME")) {
+ url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME");
+ retCBSString = executecurl(url);
+ } else {
+ log.error("Service name environment variable - HOSTNAME/SERVICE_NAME not found within container ");
+ }
+
+ }
+
+ private static String executecurl(String url) {
+
+ String[] command = {"curl", "-v", url};
+ ProcessBuilder process = new ProcessBuilder(command);
+ Process p;
+ String result = null;
+ try {
+ p = process.start();
+ InputStreamReader ipr = new InputStreamReader(p.getInputStream());
+ BufferedReader reader = new BufferedReader(ipr);
+ StringBuilder builder = new StringBuilder();
+ String line;
+
+ while ((line = reader.readLine()) != null) {
+ builder.append(line);
+ }
+ result = builder.toString();
+ log.info(result);
- public FetchDynamicConfig() {
- }
+ reader.close();
+ ipr.close();
+ } catch (IOException e) {
+ log.error("error", e);
+ e.printStackTrace();
+ }
+ return result;
- public static void main(String[] args) {
- Boolean areEqual;
- // Call consul api and identify the CBS Service address and port
- getconsul();
- // Construct and invoke CBS API to get application Configuration
- getCBS();
- // Verify if data has changed
- areEqual = verifyConfigChange();
- // If new config then write data returned into configFile for
- // LoadDynamicConfig process
- if (! areEqual) {
- FetchDynamicConfig fc = new FetchDynamicConfig();
- fc.writefile(retCBSString);
- } else {
- log.info("New config pull results identical - " + configFile + " NOT refreshed");
- }
- }
+ }
- private static void getconsul() {
-
- env = System.getenv();
- for (Map.Entry<String, String> entry : env.entrySet()) {
- log.info(entry.getKey() + ":" + entry.getValue());
- }
-
- if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")) {
- // && env.containsKey("HOSTNAME")) {
- log.info(">>>Dynamic configuration to be fetched from ConfigBindingService");
- url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + env.get("CONFIG_BINDING_SERVICE");
-
- retString = executecurl(url);
-
- } else {
- log.info(">>>Static configuration to be used");
- }
-
- }
-
- public static boolean verifyConfigChange() {
-
- boolean areEqual = false;
- // Read current data
- try {
- File f = new File(configFile);
- if (f.exists() && !f.isDirectory()) {
-
- String jsonData = LoadDynamicConfig.readFile(configFile);
- JSONObject jsonObject = new JSONObject(jsonData);
-
- ObjectMapper mapper = new ObjectMapper();
-
- JsonNode tree1 = mapper.readTree(jsonObject.toString());
- JsonNode tree2 = mapper.readTree(retCBSString);
- areEqual = tree1.equals(tree2);
- log.info("Comparison value:" + areEqual);
- } else {
- log.info("First time config file read: " + configFile);
- }
-
- } catch (IOException e) {
- log.error("Comparison with new fetched data failed" + e.getMessage());
-
- }
-
- return areEqual;
-
- }
-
- public static void getCBS() {
-
- env = System.getenv();
- // consul return as array
- JSONTokener temp = new JSONTokener(retString);
- JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0);
-
- String urlPart1 = null;
- if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) {
- urlPart1 = cbsjobj.getString("ServiceAddress") + ":" + cbsjobj.getInt("ServicePort");
- }
-
- log.info("CONFIG_BINDING_SERVICE DNS RESOLVED:" + urlPart1);
-
- if (env.containsKey("HOSTNAME")) {
- url = urlPart1 + "/service_component/" + env.get("HOSTNAME");
- retCBSString = executecurl(url);
- } else if (env.containsKey("SERVICE_NAME")) {
- url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME");
- retCBSString = executecurl(url);
- } else {
- log.error("Service name environment variable - HOSTNAME/SERVICE_NAME not found within container ");
- }
-
- }
-
- public void writefile(String retCBSString) {
- log.info("URL to fetch configuration:" + url + " Return String:" + retCBSString);
-
- String indentedretstring = (new JSONObject(retCBSString)).toString(4);
-
- try (FileWriter file = new FileWriter(FetchDynamicConfig.configFile)) {
- file.write(indentedretstring);
-
- log.info("Successfully Copied JSON Object to file " + configFile);
- } catch (IOException e) {
- log.error("Error in writing configuration into file " + configFile + retString + e.getMessage());
- e.printStackTrace();
- }
-
- }
-
- private static String executecurl(String url) {
+ public void writefile(String retCBSString) {
+ log.info("URL to fetch configuration:" + url + " Return String:" + retCBSString);
- String[] command = { "curl", "-v", url };
- ProcessBuilder process = new ProcessBuilder(command);
- Process p;
- String result = null;
- try {
- p = process.start();
- InputStreamReader ipr = new InputStreamReader(p.getInputStream());
- BufferedReader reader = new BufferedReader(ipr);
- StringBuilder builder = new StringBuilder();
- String line;
+ String indentedretstring = (new JSONObject(retCBSString)).toString(4);
- while ((line = reader.readLine()) != null) {
- builder.append(line);
- }
- result = builder.toString();
- log.info(result);
+ try (FileWriter file = new FileWriter(FetchDynamicConfig.configFile)) {
+ file.write(indentedretstring);
- reader.close();
- ipr.close();
- } catch (IOException e) {
- log.error("error", e);
- e.printStackTrace();
- }
- return result;
+ log.info("Successfully Copied JSON Object to file " + configFile);
+ } catch (IOException e) {
+ log.error("Error in writing configuration into file " + configFile + retString + e.getMessage());
+ e.printStackTrace();
+ }
- }
+ }
}
diff --git a/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java b/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java index a8ecaba0..c1ab80c1 100644 --- a/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java +++ b/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -35,95 +35,94 @@ import java.util.Map; public class LoadDynamicConfig { - private static final Logger log = LoggerFactory.getLogger(LoadDynamicConfig.class); - - public String propFile = "collector.properties"; - public String configFile = "/opt/app/KV-Configuration.json"; - public String dMaaPOutputFile = "./etc/DmaapConfig.json"; - - public LoadDynamicConfig() { - - } - - public static void main(String[] args) { - Map<String, String> env = System.getenv(); - - // Check again to ensure new controller deployment related config - if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE") - && env.containsKey("HOSTNAME")) { - - try { - - LoadDynamicConfig lc = new LoadDynamicConfig(); - String jsonData = readFile(lc.configFile); - JSONObject jsonObject = new JSONObject(jsonData); - lc.writeconfig(jsonObject); - - - } catch (Exception e) { - log.error(e.getLocalizedMessage(), e); - e.printStackTrace(); - - } - - } else { - log.info(">>>Static configuration to be used"); - } - - } - - public void writeconfig (JSONObject jsonObject) - { - - PropertiesConfiguration conf; - try { - conf = new PropertiesConfiguration(propFile); - - conf.setEncoding(null); - - // update properties based on consul dynamic configuration - Iterator<?> keys = jsonObject.keys(); - - while (keys.hasNext()) { - String key = (String) keys.next(); - // check if any configuration is related to dmaap - // and write into dmaapconfig.json - if (key.startsWith("streams_publishes")) { - // VESCollector only have publish streams - try (FileWriter file = new FileWriter(dMaaPOutputFile)) { - String indentedretstring=(new JSONObject(jsonObject.get(key).toString())).toString(4); - file.write(indentedretstring); - log.info("Successfully written JSON Object to DmaapConfig.json"); - } catch (IOException e) { - log.info("Error in writing dmaap configuration into DmaapConfig.json", e); - } - } else { - conf.setProperty(key, jsonObject.get(key).toString()); - } - - } - conf.save(); - } catch (ConfigurationException e) { - log.error(e.getLocalizedMessage(), e); - e.printStackTrace(); - } - } - - public static String readFile(String filename) { - String result = ""; - try (BufferedReader br = new BufferedReader(new FileReader(filename))) { - StringBuilder sb = new StringBuilder(); - String line = br.readLine(); - while (line != null) { - sb.append(line); - line = br.readLine(); - } - result = sb.toString(); - } catch (Exception e) { - log.error(e.getLocalizedMessage(), e); - e.printStackTrace(); - } - return result; - } + private static final Logger log = LoggerFactory.getLogger(LoadDynamicConfig.class); + + public String propFile = "collector.properties"; + public String configFile = "/opt/app/KV-Configuration.json"; + public String dMaaPOutputFile = "./etc/DmaapConfig.json"; + + public LoadDynamicConfig() { + + } + + public static void main(String[] args) { + Map<String, String> env = System.getenv(); + + // Check again to ensure new controller deployment related config + if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE") + && env.containsKey("HOSTNAME")) { + + try { + + LoadDynamicConfig lc = new LoadDynamicConfig(); + String jsonData = readFile(lc.configFile); + JSONObject jsonObject = new JSONObject(jsonData); + lc.writeconfig(jsonObject); + + + } catch (Exception e) { + log.error(e.getLocalizedMessage(), e); + e.printStackTrace(); + + } + + } else { + log.info(">>>Static configuration to be used"); + } + + } + + public static String readFile(String filename) { + String result = ""; + try (BufferedReader br = new BufferedReader(new FileReader(filename))) { + StringBuilder sb = new StringBuilder(); + String line = br.readLine(); + while (line != null) { + sb.append(line); + line = br.readLine(); + } + result = sb.toString(); + } catch (Exception e) { + log.error(e.getLocalizedMessage(), e); + e.printStackTrace(); + } + return result; + } + + public void writeconfig(JSONObject jsonObject) { + + PropertiesConfiguration conf; + try { + conf = new PropertiesConfiguration(propFile); + + conf.setEncoding(null); + + // update properties based on consul dynamic configuration + Iterator<?> keys = jsonObject.keys(); + + while (keys.hasNext()) { + String key = (String) keys.next(); + // check if any configuration is related to dmaap + // and write into dmaapconfig.json + if (key.startsWith("streams_publishes")) { + // VESCollector only have publish streams + try (FileWriter file = new FileWriter(dMaaPOutputFile)) { + String indentedretstring = (new JSONObject(jsonObject.get(key).toString())).toString(4); + file.write(indentedretstring); + log.info("Successfully written JSON Object to DmaapConfig.json"); + } catch (IOException e) { + log.info("Error in writing dmaap configuration into DmaapConfig.json", e); + } + } else { + conf.setProperty(key, jsonObject.get(key).toString()); + } + + } + conf.save(); + } catch (ConfigurationException e) { + log.error(e.getLocalizedMessage(), e); + e.printStackTrace(); + } + } } diff --git a/src/main/java/org/onap/dcae/restapi/ApiAuthInterceptor.java b/src/main/java/org/onap/dcae/restapi/ApiAuthInterceptor.java new file mode 100644 index 00000000..864a16d7 --- /dev/null +++ b/src/main/java/org/onap/dcae/restapi/ApiAuthInterceptor.java @@ -0,0 +1,78 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.restapi; + +import io.vavr.control.Option; +import org.onap.dcae.ApplicationSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; +import org.springframework.web.servlet.handler.HandlerInterceptorAdapter; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.Base64; + +final class ApiAuthInterceptor extends HandlerInterceptorAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(ApiAuthInterceptor.class); + private final ApplicationSettings applicationSettings; + + private Logger errorLog; + + ApiAuthInterceptor(ApplicationSettings applicationSettings, Logger errorLog) { + this.applicationSettings = applicationSettings; + this.errorLog = errorLog; + } + + @Override + public boolean preHandle(HttpServletRequest request, HttpServletResponse response, + Object handler) throws IOException { + if (applicationSettings.authorizationEnabled()) { + String authorizationHeader = request.getHeader("Authorization"); + if (authorizationHeader == null || !isAuthorized(authorizationHeader)) { + response.setStatus(400); + errorLog.error("EVENT_RECEIPT_FAILURE: Unauthorized user"); + response.getWriter().write(ApiException.UNAUTHORIZED_USER.toJSON().toString()); + return false; + } + } + return true; + } + + private boolean isAuthorized(String authorizationHeader) { + try { + String encodedData = authorizationHeader.split(" ")[1]; + String decodedData = new String(Base64.getDecoder().decode(encodedData)); + String providedUser = decodedData.split(":")[0].trim(); + String providedPassword = decodedData.split(":")[1].trim(); + Option<String> maybeSavedPassword = applicationSettings.validAuthorizationCredentials().get(providedUser); + boolean userRegistered = maybeSavedPassword.isDefined(); + return userRegistered && maybeSavedPassword.get().equals(providedPassword); + } catch (Exception e) { + LOG.warn(String.format("Could not check if user is authorized (header: '%s')), probably malformed header.", + authorizationHeader), e); + return false; + } + } +} diff --git a/src/main/java/org/onap/dcae/restapi/endpoints/Ui.java b/src/main/java/org/onap/dcae/restapi/ApiConfiguration.java index ae593b44..85db81df 100644 --- a/src/main/java/org/onap/dcae/restapi/endpoints/Ui.java +++ b/src/main/java/org/onap/dcae/restapi/ApiConfiguration.java @@ -1,15 +1,16 @@ -/*- +/* * ============LICENSE_START======================================================= * PROJECT * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved.s * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,14 +19,28 @@ * ============LICENSE_END========================================================= */ -package org.onap.dcae.restapi.endpoints; +package org.onap.dcae.restapi; -import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint; -import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext; +import org.onap.dcae.ApplicationSettings; +import org.slf4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.servlet.config.annotation.InterceptorRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; -public class Ui extends NsaBaseEndpoint { +@Configuration +public class ApiConfiguration implements WebMvcConfigurer { + private final ApplicationSettings applicationSettings; + private Logger errorLogger; - public static void hello(DrumlinRequestContext ctx) { - ctx.renderer().renderTemplate("templates/hello.html"); + @Autowired + ApiConfiguration(ApplicationSettings applicationSettings, Logger errorLogger) { + this.applicationSettings = applicationSettings; + this.errorLogger = errorLogger; + } + + @Override + public void addInterceptors(InterceptorRegistry registry) { + registry.addInterceptor(new ApiAuthInterceptor(applicationSettings, errorLogger)); } } diff --git a/src/main/java/org/onap/dcae/restapi/ApiException.java b/src/main/java/org/onap/dcae/restapi/ApiException.java index 0f922678..53895ffe 100644 --- a/src/main/java/org/onap/dcae/restapi/ApiException.java +++ b/src/main/java/org/onap/dcae/restapi/ApiException.java @@ -33,10 +33,10 @@ public enum ApiException { UNAUTHORIZED_USER(ExceptionType.POLICY_EXCEPTION, "POL2000", "Unauthorized user", 401), NO_SERVER_RESOURCES(ExceptionType.SERVICE_EXCEPTION, "SVC1000", "No server resources (internal processing queue full)", 503); + public final int httpStatusCode; private final ExceptionType type; private final String code; private final String details; - public final int httpStatusCode; ApiException(ExceptionType type, String code, String details, int httpStatusCode) { this.type = type; @@ -45,18 +45,9 @@ public enum ApiException { this.httpStatusCode = httpStatusCode; } - public enum ExceptionType { - SERVICE_EXCEPTION, POLICY_EXCEPTION; - - @Override - public String toString() { - return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, this.name()); - } - } - public JSONObject toJSON() { JSONObject exceptionTypeNode = new JSONObject(); - exceptionTypeNode.put("messageId", code ); + exceptionTypeNode.put("messageId", code); exceptionTypeNode.put("text", details); JSONObject requestErrorNode = new JSONObject(); @@ -67,4 +58,13 @@ public enum ApiException { return rootNode; } + public enum ExceptionType { + SERVICE_EXCEPTION, POLICY_EXCEPTION; + + @Override + public String toString() { + return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, this.name()); + } + } + } diff --git a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java b/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java deleted file mode 100644 index e5a29e9f..00000000 --- a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java +++ /dev/null @@ -1,127 +0,0 @@ -
-/*
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.restapi;
-
-import java.io.IOException;
-import java.net.URL;
-
-import javax.servlet.ServletException;
-
-import org.apache.tomcat.util.codec.binary.Base64;
-import org.onap.dcae.ApplicationSettings;
-import org.onap.dcae.commonFunction.CommonStartup;
-import org.onap.dcae.commonFunction.VESLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.apiServer.CommonServlet;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.drumlin.service.framework.routing.DrumlinRequestRouter;
-import com.att.nsa.drumlin.service.framework.routing.playish.DrumlinPlayishRoutingFileSource;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.nsa.security.NsaAuthenticator;
-
-import com.att.nsa.security.authenticators.SimpleAuthenticator;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-
-public class RestfulCollectorServlet extends CommonServlet
-{
-
- private static final long serialVersionUID = 1L;
- private static final Logger log = LoggerFactory.getLogger ( RestfulCollectorServlet.class );
-
- private static String authCredentialsList;
-
- public RestfulCollectorServlet ( ApplicationSettings settings ) throws loadException, missingReqdSetting
- {
- super ( settings.torrNvReadable(), "collector", false );
- authCredentialsList = settings.validAuthorizationCredentials();
- }
-
-
-
-
- /**
- * This is called once at server start. Use it to init any shared objects and setup the route mapping.
- */
- @Override
- protected void servletSetup () throws rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException
- {
- super.servletSetup ();
-
- try {
- // the base class provides a bunch of things like API authentication and ECOMP compliant
- // logging. The Restful Collector likely doesn't need API authentication, so for now,
- // we init the base class services with an in-memory (and empty!) config DB.
- commonServletSetup ( ConfigDbType.MEMORY );
-
- VESLogger.setUpEcompLogging();
-
- // setup the servlet routing and error handling
- final DrumlinRequestRouter drr = getRequestRouter ();
-
- // you can tell the request router what to do when a particular kind of exception is thrown.
- drr.setHandlerForException(IllegalArgumentException.class,
- (ctx, cause) -> sendJsonReply (ctx, HttpStatusCodes.k400_badRequest, cause.getMessage() ));
-
- // load the routes from the config file
- final URL routes = findStream ( "routes.conf" );
- if ( routes == null ) throw new rrNvReadable.missingReqdSetting ( "No routing configuration." );
- final DrumlinPlayishRoutingFileSource drs = new DrumlinPlayishRoutingFileSource ( routes );
- drr.addRouteSource ( drs );
-
- if (CommonStartup.authflag) {
- NsaAuthenticator<NsaSimpleApiKey> NsaAuth;
- NsaAuth = createAuthenticator(authCredentialsList);
-
- this.getSecurityManager().addAuthenticator(NsaAuth);
- }
-
- log.info ( "Restful Collector Servlet is up." );
- }
- catch ( SecurityException | IOException | ConfigDbException e ) {
- throw new ServletException ( e );
- }
- }
-
- public NsaAuthenticator<NsaSimpleApiKey> createAuthenticator(String authCredentials) {
- NsaAuthenticator<NsaSimpleApiKey> authenticator = new SimpleAuthenticator();
- if (authCredentials != null) {
- String authpair[] = authCredentials.split("\\|");
- for (String pair : authpair) {
- String lineid[] = pair.split(",");
- String listauthid = lineid[0];
- String listauthpwd = new String(Base64.decodeBase64(lineid[1]));
- ((SimpleAuthenticator) authenticator).add(listauthid, listauthpwd);
- }
-
- } else {
- ((SimpleAuthenticator) authenticator).add("admin", "collectorpasscode");
- }
- return authenticator;
- }
-
-}
-
diff --git a/src/main/java/org/onap/dcae/restapi/ServletConfig.java b/src/main/java/org/onap/dcae/restapi/ServletConfig.java new file mode 100644 index 00000000..e8efa375 --- /dev/null +++ b/src/main/java/org/onap/dcae/restapi/ServletConfig.java @@ -0,0 +1,79 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved.s + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.restapi; + +import org.onap.dcae.ApplicationSettings; +import org.onap.dcae.SchemaValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.web.server.Ssl; +import org.springframework.boot.web.server.WebServerFactoryCustomizer; +import org.springframework.boot.web.servlet.server.ConfigurableServletWebServerFactory; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.nio.file.Paths; + +import static java.nio.file.Files.readAllBytes; + +@Component +public class ServletConfig implements WebServerFactoryCustomizer<ConfigurableServletWebServerFactory> { + + private static final Logger log = LoggerFactory.getLogger(SchemaValidator.class); + + @Autowired + private ApplicationSettings properties; + + @Override + public void customize(ConfigurableServletWebServerFactory container) { + if (properties.authorizationEnabled()) { + container.setSsl(createSSL()); + container.setPort(properties.httpsPort()); + } else { + container.setPort(properties.httpPort()); + } + } + + private Ssl createSSL() { + log.info("Enabling SSL"); + Ssl ssl = new Ssl(); + ssl.setEnabled(true); + String keyStore = Paths.get(properties.keystoreFileLocation()).toAbsolutePath().toString(); + log.info("Using keyStore path: " + keyStore); + ssl.setKeyStore(keyStore); + String keyPasswordFileLocation = Paths.get(properties.keystorePasswordFileLocation()).toAbsolutePath().toString(); + log.info("Using keyStore password from: " + keyPasswordFileLocation); + ssl.setKeyPassword(getKeyStorePassword(keyPasswordFileLocation)); + ssl.setKeyAlias(properties.keystoreAlias()); + return ssl; + } + + private String getKeyStorePassword(String location) { + try { + return new String(readAllBytes(Paths.get(location))); + } catch (IOException e) { + log.error("Could not read keystore password from: '" + location + "'.", e); + throw new RuntimeException(e); + } + } +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/restapi/VesRestController.java b/src/main/java/org/onap/dcae/restapi/VesRestController.java new file mode 100644 index 00000000..b7fc5f3b --- /dev/null +++ b/src/main/java/org/onap/dcae/restapi/VesRestController.java @@ -0,0 +1,214 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved.s + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.restapi; + +import static java.util.Optional.ofNullable; +import static java.util.stream.StreamSupport.stream; +import static org.springframework.http.ResponseEntity.accepted; +import static org.springframework.http.ResponseEntity.ok; + +import com.att.nsa.clock.SaClock; +import com.att.nsa.logging.LoggingContext; +import com.att.nsa.logging.log4j.EcompFields; +import com.github.fge.jackson.JsonLoader; +import com.github.fge.jsonschema.core.report.ProcessingReport; +import com.github.fge.jsonschema.main.JsonSchema; + +import java.util.UUID; +import java.util.concurrent.LinkedBlockingQueue; +import javax.servlet.http.HttpServletRequest; + +import org.json.JSONArray; +import org.json.JSONObject; +import org.onap.dcae.ApplicationSettings; +import org.onap.dcae.CollectorSchemas; +import org.onap.dcae.commonFunction.VESLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +@RestController +public class VesRestController { + + private static final Logger LOG = LoggerFactory.getLogger(VesRestController.class); + + private static final String FALLBACK_VES_VERSION = "v5"; + + @Autowired + private ApplicationSettings collectorProperties; + + @Autowired + private CollectorSchemas schemas; + + @Autowired + @Qualifier("metriclog") + private Logger metriclog; + + @Autowired + @Qualifier("incomingRequestsLogger") + private Logger incomingRequestsLogger; + + @Autowired + @Qualifier("errorLog") + private Logger errorLog; + + private LinkedBlockingQueue<JSONObject> inputQueue; + private String version; + + @Autowired + VesRestController(@Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger, + @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) { + this.incomingRequestsLogger = incomingRequestsLogger; + this.inputQueue = inputQueue; + } + + @GetMapping("/") + String mainPage() { + return "Welcome to VESCollector"; + } + + //refactor in next iteration + @PostMapping(value = {"/eventListener/v1", + "/eventListener/v1/eventBatch", + "/eventListener/v2", + "/eventListener/v2/eventBatch", + "/eventListener/v3", + "/eventListener/v3/eventBatch", + "/eventListener/v4", + "/eventListener/v4/eventBatch", + "/eventListener/v5", + "/eventListener/v5/eventBatch"}, consumes = "application/json") + ResponseEntity<String> receiveEvent(@RequestBody String jsonPayload, HttpServletRequest httpServletRequest) { + String request = httpServletRequest.getRequestURI(); + extractVersion(request); + + JSONObject jsonObject; + try { + jsonObject = new JSONObject(jsonPayload); + } catch (Exception e) { + return ResponseEntity.badRequest().body(ApiException.INVALID_JSON_INPUT.toJSON().toString()); + } + + String uuid = setUpECOMPLoggingForRequest(); + incomingRequestsLogger.info(String.format( + "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'", + jsonObject, uuid, version, httpServletRequest.getRemoteHost())); + + if (collectorProperties.jsonSchemaValidationEnabled()) { + if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) { + if (!conformsToSchema(jsonObject, version)) { + return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED); + } + } else if (!isBatchRequest(request) && (!jsonObject.has("eventList") && (jsonObject.has("event")))) { + if (!conformsToSchema(jsonObject, version)) { + return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED); + } + } else { + return errorResponse(ApiException.INVALID_JSON_INPUT); + } + } + + JSONArray commonlyFormatted = convertToJSONArrayCommonFormat(jsonObject, request, uuid, version); + + if (!putEventsOnProcessingQueue(commonlyFormatted)) { + errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES); + return errorResponse(ApiException.NO_SERVER_RESOURCES); + } + return ok().contentType(MediaType.APPLICATION_JSON).body("Message Accepted"); + } + + private void extractVersion(String httpServletRequest) { + version = httpServletRequest.split("/")[2]; + } + + private ResponseEntity<String> errorResponse(ApiException noServerResources) { + return ResponseEntity.status(noServerResources.httpStatusCode) + .body(noServerResources.toJSON().toString()); + } + + private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) { + for (int i = 0; i < arrayOfEvents.length(); i++) { + metriclog.info("EVENT_PUBLISH_START"); + if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) { + return false; + } + } + LOG.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); + metriclog.info("EVENT_PUBLISH_END"); + return true; + } + + private boolean conformsToSchema(JSONObject payload, String version) { + try { + JsonSchema schema = ofNullable(schemas.getJSONSchemasMap(version).get(version)) + .orElse(schemas.getJSONSchemasMap(version).get(FALLBACK_VES_VERSION)); + ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString())); + if (!report.isSuccess()) { + LOG.warn("Schema validation failed for event: " + payload); + stream(report.spliterator(), false).forEach(e -> LOG.warn(e.getMessage())); + return false; + } + return report.isSuccess(); + } catch (Exception e) { + throw new RuntimeException("Unable to validate against schema", e); + } + } + + private static JSONArray convertToJSONArrayCommonFormat(JSONObject jsonObject, String request, + String uuid, String version) { + JSONArray asArrayEvents = new JSONArray(); + String vesUniqueIdKey = "VESuniqueId"; + String vesVersionKey = "VESversion"; + if (isBatchRequest(request)) { + JSONArray events = jsonObject.getJSONArray("eventList"); + for (int i = 0; i < events.length(); i++) { + JSONObject event = new JSONObject().put("event", events.getJSONObject(i)); + event.put(vesUniqueIdKey, uuid + "-" + i); + event.put(vesVersionKey, version); + asArrayEvents.put(event); + } + } else { + jsonObject.put(vesUniqueIdKey, uuid); + jsonObject.put(vesVersionKey, version); + asArrayEvents = new JSONArray().put(jsonObject); + } + return asArrayEvents; + } + + private static String setUpECOMPLoggingForRequest() { + final UUID uuid = UUID.randomUUID(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + return uuid.toString(); + } + + private static boolean isBatchRequest(String request) { + return request.contains("eventBatch"); + } +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java b/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java deleted file mode 100644 index d60e2a11..00000000 --- a/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java +++ /dev/null @@ -1,247 +0,0 @@ -/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.restapi.endpoints;
-
-import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.drumlin.service.standards.MimeTypes;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-import com.google.gson.JsonParser;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.Base64;
-import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.onap.dcae.commonFunction.CommonStartup;
-import org.onap.dcae.commonFunction.CommonStartup.QueueFullException;
-import org.onap.dcae.commonFunction.VESLogger;
-import org.onap.dcae.restapi.ApiException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventReceipt extends NsaBaseEndpoint {
-
- private static final Logger log = LoggerFactory.getLogger(EventReceipt.class);
- private static final String MESSAGE = " Message:";
-
- public static void receiveVESEvent(DrumlinRequestContext ctx) {
- // the request body carries events. assume for now it's an array
- // of json objects that fits in memory. (See cambria's parsing for
- // handling large messages)
-
- NsaSimpleApiKey retkey = null;
-
-
- JSONObject jsonObject;
- InputStream istr = null;
- int arrayFlag = 0;
- String vesVersion = null;
- String userId=null;
-
- try {
-
-
- istr = ctx.request().getBodyStream();
- jsonObject = new JSONObject(new JSONTokener(istr));
-
- log.info("ctx getPathInContext: " + ctx.request().getPathInContext());
- Pattern p = Pattern.compile("(v\\d+)");
- Matcher m = p.matcher(ctx.request().getPathInContext());
-
- if (m.find()) {
- log.info("VES version:" + m.group());
- vesVersion = m.group();
- }
-
- final UUID uuid = UUID.randomUUID();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
-
- if (ctx.request().getPathInContext().contains("eventBatch")) {
- CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid
- + " VES Batch Input Messsage: " + jsonObject);
- log.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid + " VES Batch Input Messsage: "
- + jsonObject);
- arrayFlag = 1;
- } else {
- CommonStartup.inlog.info(
- ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);
- log.info(ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);
-
- }
-
- try {
- if (CommonStartup.authflag) {
- userId = getUser (ctx);
- retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx);
- }
- } catch (NullPointerException x) {
- //log.info("Invalid user request :" + userId + " FROM " + ctx.request().getRemoteAddress() + " " + ctx.request().getContentType() + MESSAGE + jsonObject);
- log.info(String.format("Unauthorized request %s FROM %s %s %s %s", getUser(ctx), ctx.request().getRemoteAddress(), ctx.request().getContentType(), MESSAGE, jsonObject));
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + userId + x);
- respondWithCustomMsginJson(ctx, ApiException.UNAUTHORIZED_USER);
- return;
- }
-
- if (schemaCheck(retkey, arrayFlag, jsonObject, vesVersion, ctx, uuid)) {
- return;
- }
-
- } catch (JSONException | NullPointerException | IOException x) {
- log.error(String.format("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest%d%s%s",
- HttpStatusCodes.k400_badRequest, MESSAGE, x.getMessage()));
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x);
- respondWithCustomMsginJson(ctx, ApiException.INVALID_JSON_INPUT);
- return;
- } catch (QueueFullException e) {
- log.error("Collector internal queue full :" + e.getMessage(), e);
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e);
- respondWithCustomMsginJson(ctx, ApiException.NO_SERVER_RESOURCES);
- return;
- } finally {
- if (istr != null) {
- safeClose(istr);
- }
- }
- log.info("MessageAccepted and k200_ok to be sent");
- ctx.response().sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson);
- }
-
-
- private static String getUser(DrumlinRequestContext ctx){
- String authorization = ctx.request().getFirstHeader("Authorization");
- if (authorization != null && authorization.startsWith("Basic")) {
- String base64Credentials = authorization.substring("Basic".length()).trim();
- String credentials = new String(Base64.getDecoder().decode(base64Credentials),
- Charset.forName("UTF-8"));
- final String[] values = credentials.split(":",2);
- log.debug("User:" + values[0] + " Pwd:" + values[1]);
- return values[0];
- }
- return null;
-
- }
-
- private static Boolean schemaCheck(NsaSimpleApiKey retkey, int arrayFlag,
- JSONObject jsonObject, String vesVersion,
- DrumlinRequestContext ctx, UUID uuid)
- throws JSONException, QueueFullException, IOException {
-
- JSONArray jsonArray;
- JSONArray jsonArrayMod = new JSONArray();
- JSONObject event;
- FileReader fr;
- if (retkey != null || !CommonStartup.authflag) {
- if (CommonStartup.schemaValidatorflag) {
- if ((arrayFlag == 1) && (jsonObject.has("eventList") && (!jsonObject.has("event")))
- || ((arrayFlag == 0) && (!jsonObject.has("eventList") && (jsonObject.has("event"))))) {
- fr = new FileReader(schemaFileVersion(vesVersion));
- String schema = new JsonParser().parse(fr).toString();
-
- String valresult = CommonStartup.validateAgainstSchema(jsonObject.toString(), schema);
- switch (valresult) {
- case "true":
- log.info("Validation successful");
- break;
- case "false":
- log.info("Validation failed");
- respondWithCustomMsginJson(ctx, ApiException.SCHEMA_VALIDATION_FAILED);
- return true;
- default:
- log.error("Validation errored" + valresult);
- respondWithCustomMsginJson(ctx, ApiException.INVALID_JSON_INPUT);
- return true;
- }
- } else {
- log.info("Validation failed");
- respondWithCustomMsginJson(ctx, ApiException.SCHEMA_VALIDATION_FAILED);
- return true;
- }
- if (arrayFlag == 1) {
- jsonArray = jsonObject.getJSONArray("eventList");
- log.info("Validation successful for all events in batch");
- for (int i = 0; i < jsonArray.length(); i++) {
- event = new JSONObject().put("event", jsonArray.getJSONObject(i));
- event.put("VESuniqueId", uuid + "-" + i);
- event.put("VESversion", vesVersion);
- jsonArrayMod.put(event);
- }
- log.info("Modified jsonarray:" + jsonArrayMod.toString());
- } else {
- jsonObject.put("VESuniqueId", uuid);
- jsonObject.put("VESversion", vesVersion);
- jsonArrayMod = new JSONArray().put(jsonObject);
- }
- }
-
- // reject anything that's not JSON
- if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) {
- log.info(String.format("Rejecting request with content type %s Message:%s",
- ctx.request().getContentType(), jsonObject));
- respondWithCustomMsginJson(ctx, ApiException.INVALID_CONTENT_TYPE);
- return true;
- }
-
- CommonStartup.handleEvents(jsonArrayMod);
- } else {
- log.info(String.format("Unauthorized request %s FROM %s %s %s %s", getUser(ctx), ctx.request().getRemoteAddress(), ctx.request().getContentType(), MESSAGE,
- jsonObject));
- respondWithCustomMsginJson(ctx, ApiException.UNAUTHORIZED_USER);
- return true;
- }
- return false;
- }
-
- private static void respondWithCustomMsginJson(DrumlinRequestContext ctx, ApiException apiException) {
- ctx.response()
- .sendErrorAndBody(apiException.httpStatusCode,
- apiException.toJSON().toString(), MimeTypes.kAppJson);
- }
-
- private static void safeClose(InputStream is) {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- log.error("Error closing Input stream : " + e);
- }
- }
-
- }
-
- public static String schemaFileVersion(String version) {
- return CommonStartup.schemaFileJson.has(version) ?
- CommonStartup.schemaFileJson.getString(version) : CommonStartup.schemaFileJson.getString("v5");
- }
-
-}
-
|