aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPawelSzalapski <pawel.szalapski@nokia.com>2018-07-31 08:18:03 +0200
committerPawelSzalapski <pawel.szalapski@nokia.com>2018-08-01 09:56:00 +0200
commitfc073344d4c0eb8a28bf34c07a8439176cf846ca (patch)
tree01f5b4789c3d9369eaebb54a9f910a9fa400af1f
parentd12cd3525284cc41414d8fdae09e2ffbc03a1fbb (diff)
Replace nsaCore library with Spring
Change-Id: I2227939a67a2cbba2d392136d49ef4419600d186 Issue-ID: DCAEGEN2-602 Signed-off-by: PawelSzalapski <pawel.szalapski@nokia.com>
-rwxr-xr-xetc/collector.properties4
-rw-r--r--etc/passwordfile2
-rw-r--r--pom.xml105
-rw-r--r--src/main/java/org/onap/dcae/ApplicationSettings.java54
-rw-r--r--src/main/java/org/onap/dcae/CLIUtils.java31
-rw-r--r--src/main/java/org/onap/dcae/CollectorSchemas.java74
-rw-r--r--src/main/java/org/onap/dcae/SchemaValidator.java77
-rw-r--r--src/main/java/org/onap/dcae/VesApplication.java110
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/AnyNode.java8
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/CommonStartup.java204
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java916
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/EventProcessor.java43
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/VESLogger.java248
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java76
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java19
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java20
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java44
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java17
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java4
-rw-r--r--src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java298
-rw-r--r--src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java183
-rw-r--r--src/main/java/org/onap/dcae/restapi/ApiAuthInterceptor.java78
-rw-r--r--src/main/java/org/onap/dcae/restapi/ApiConfiguration.java (renamed from src/main/java/org/onap/dcae/restapi/endpoints/Ui.java)33
-rw-r--r--src/main/java/org/onap/dcae/restapi/ApiException.java22
-rw-r--r--src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java127
-rw-r--r--src/main/java/org/onap/dcae/restapi/ServletConfig.java79
-rw-r--r--src/main/java/org/onap/dcae/restapi/VesRestController.java214
-rw-r--r--src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java247
-rw-r--r--src/main/scripts/VESrestfulCollector.sh2
-rw-r--r--src/test/java/org/onap/dcae/ApplicationSettingsTest.java35
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/CommonStartupTest.java127
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java29
-rw-r--r--src/test/java/org/onap/dcae/vestest/TestConfigProcessor.java36
-rw-r--r--src/test/java/org/onap/dcae/vestest/TestEventReceipt.java46
-rw-r--r--src/test/java/org/onap/dcae/vestest/TestJsonSchemaValidation.java14
35 files changed, 1762 insertions, 1864 deletions
diff --git a/etc/collector.properties b/etc/collector.properties
index 67c6d39c..4354eb69 100755
--- a/etc/collector.properties
+++ b/etc/collector.properties
@@ -24,8 +24,8 @@ collector.service.port=8080
collector.service.secure.port=8443
## The keystore must be setup per installation when secure port is configured
-collector.keystore.file.location=../etc/keystore
-collector.keystore.passwordfile=./etc/passwordfile
+collector.keystore.file.location=etc/keystore
+collector.keystore.passwordfile=etc/passwordfile
collector.keystore.alias=tomcat
diff --git a/etc/passwordfile b/etc/passwordfile
index 702a4cbd..25acfbf5 100644
--- a/etc/passwordfile
+++ b/etc/passwordfile
@@ -1 +1 @@
-collector
+collector \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index c87a9d24..150d9b3f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,6 +64,18 @@ limitations under the License.
<pluginManagement>
<plugins>
<plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <version>2.0.2.RELEASE</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>repackage</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
</plugin>
@@ -318,6 +330,19 @@ limitations under the License.
</plugins>
</reporting>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <!-- Import dependency management from Spring Boot -->
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-dependencies</artifactId>
+ <version>2.0.3.RELEASE</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<dependencies>
<!-- JSON RELATED -->
<dependency>
@@ -348,54 +373,11 @@ limitations under the License.
<!-- REST API RELATED -->
<dependency>
- <groupId>org.apache.tomcat.embed</groupId>
- <artifactId>tomcat-embed-core</artifactId>
- <version>8.5.23</version>
- </dependency>
- <dependency>
- <groupId>org.apache.tomcat</groupId>
- <artifactId>tomcat-catalina</artifactId>
- <version>8.5.23</version>
- </dependency>
- <dependency>
- <groupId>org.apache.tomcat</groupId>
- <artifactId>tomcat-coyote</artifactId>
- <version>8.5.23</version>
- </dependency>
- <dependency>
<groupId>com.att.nsa</groupId>
- <artifactId>nsaServerLibrary</artifactId>
- <version>1.0.10</version>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>4.5.3</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>2.8.11</version>
+ <artifactId>cambriaClient</artifactId>
+ <version>0.0.1</version>
</dependency>
- <!-- LOGGING RELATED -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.21</version>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>apache-log4j-extras</artifactId>
- <version>1.2.17</version>
- </dependency>
-
- <!-- MISCELLANEOUS -->
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>18.0</version>
- </dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
@@ -407,16 +389,33 @@ limitations under the License.
<version>1.10</version>
</dependency>
<dependency>
- <groupId>javax.mail</groupId>
- <artifactId>mail</artifactId>
- <version>1.4.7</version>
- </dependency>
- <dependency>
<groupId>io.vavr</groupId>
<artifactId>vavr</artifactId>
<version>0.9.2</version>
</dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-security</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.security</groupId>
+ <artifactId>spring-security-config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-log4j2</artifactId>
+ </dependency>
<!-- TESTING -->
<dependency>
@@ -426,6 +425,12 @@ limitations under the License.
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.8.0</version>
diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java
index 0ebd1e90..9063faa4 100644
--- a/src/main/java/org/onap/dcae/ApplicationSettings.java
+++ b/src/main/java/org/onap/dcae/ApplicationSettings.java
@@ -21,41 +21,50 @@
package org.onap.dcae;
-import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
-import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
import com.google.common.annotations.VisibleForTesting;
import io.vavr.Function1;
import io.vavr.collection.HashMap;
+import io.vavr.collection.List;
import io.vavr.collection.Map;
-import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import javax.annotation.Nullable;
import java.io.File;
import java.nio.file.Paths;
+import java.util.Base64;
+
+import static java.util.Arrays.stream;
/**
* Abstraction over application configuration.
* Its job is to provide easily discoverable (by method names lookup) and type safe access to configuration properties.
*/
+@Component
public class ApplicationSettings {
private static final Logger inlog = LoggerFactory.getLogger(ApplicationSettings.class);
private static final String COLLECTOR_PROPERTIES = "etc/collector.properties";
+
+ private final String appInvocationDir;
private final PropertiesConfiguration properties = new PropertiesConfiguration();
public ApplicationSettings(String[] args, Function1<String[], Map<String, String>> argsParser) {
+ this(args, argsParser, System.getProperty("user.dir"));
+ }
+
+ public ApplicationSettings(String[] args, Function1<String[], Map<String, String>> argsParser, String appInvocationDir) {
+ this.appInvocationDir = appInvocationDir;
properties.setDelimiterParsingDisabled(true);
Map<String, String> parsedArgs = argsParser.apply(args);
loadProperties(Paths.get(new File(COLLECTOR_PROPERTIES).getAbsolutePath()).toString());
loadCommandLineProperties(parsedArgs);
parsedArgs.filterKeys(k -> !k.equals("c")).forEach(this::updateProperty);
}
-
private void loadCommandLineProperties(Map<String, String> parsedArgs) {
parsedArgs.get("c").forEach(e -> {
properties.clear();
@@ -63,7 +72,7 @@ public class ApplicationSettings {
});
}
- private void loadProperties(String property){
+ private void loadProperties(String property) {
try {
properties.load(property);
} catch (ConfigurationException ex) {
@@ -72,8 +81,13 @@ public class ApplicationSettings {
}
}
- public String validAuthorizationCredentials() {
- return properties.getString("header.authlist", null);
+ public Map<String, String> validAuthorizationCredentials() {
+ return prepareUsersMap(properties.getString("header.authlist", null));
+ }
+
+ private Map<String, String> prepareUsersMap(@Nullable String allowedUsers) {
+ return allowedUsers == null ? HashMap.empty() : List.ofAll(stream(allowedUsers.split("\\|")))
+ .toMap(t -> t.split(",")[0].trim(), t -> new String(Base64.getDecoder().decode(t.split(",")[1])).trim());
}
public int maximumAllowedQueuedEvents() {
@@ -110,11 +124,11 @@ public class ApplicationSettings {
}
public String keystorePasswordFileLocation() {
- return properties.getString("collector.keystore.passwordfile", "./etc/passwordfile");
+ return prependWithUserDirOnRelative(properties.getString("collector.keystore.passwordfile", "etc/passwordfile"));
}
public String keystoreFileLocation() {
- return properties.getString("collector.keystore.file.location", "../etc/keystore");
+ return prependWithUserDirOnRelative(properties.getString("collector.keystore.file.location", "etc/keystore"));
}
public String keystoreAlias() {
@@ -126,7 +140,7 @@ public class ApplicationSettings {
}
public String cambriaConfigurationFileLocation() {
- return properties.getString("collector.dmaapfile", "./etc/DmaapConfig.json");
+ return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "etc/DmaapConfig.json"));
}
public Map<String, String[]> dMaaPStreamsMapping() {
@@ -138,17 +152,6 @@ public class ApplicationSettings {
}
}
- /*
- * Kept back here for backward compatibility.
- * RestfulCollectorServlet upon its initialization requires options to be represented
- * as object represented by rrNvReadable interface, so we define a a handy transformation function here.
- */
- public rrNvReadable torrNvReadable() {
- final nvReadableStack settings = new nvReadableStack();
- settings.push(new nvReadableTable(ConfigurationConverter.getProperties(properties)));
- return settings;
- }
-
private Map<String, String[]> convertDMaaPStreamsPropertyToMap(String streamIdsProperty) {
java.util.HashMap<String, String[]> domainToStreamIdsMapping = new java.util.HashMap<>();
String[] topics = streamIdsProperty.split("\\|");
@@ -168,6 +171,13 @@ public class ApplicationSettings {
}
}
+ public String prependWithUserDirOnRelative(String filePath) {
+ if (!Paths.get(filePath).isAbsolute()) {
+ filePath = Paths.get(appInvocationDir, filePath).toString();
+ }
+ return filePath;
+ }
+
@VisibleForTesting
String getStringDirectly(String key) {
return properties.getString(key);
diff --git a/src/main/java/org/onap/dcae/CLIUtils.java b/src/main/java/org/onap/dcae/CLIUtils.java
index 6450d2e5..6764d5b2 100644
--- a/src/main/java/org/onap/dcae/CLIUtils.java
+++ b/src/main/java/org/onap/dcae/CLIUtils.java
@@ -24,36 +24,29 @@ package org.onap.dcae;
import java.util.HashMap;
/**
- * CLIUtils extracted from nsaServerLibrary this implementation will be removed once we switch to different API library
+ * CLIUtils extracted from nsaServerLibrary this implementation will be removed once we switch to different API library
*/
public class CLIUtils {
- public static io.vavr.collection.HashMap<String, String> processCmdLine (String[] args) {
- final HashMap<String,String> map = new HashMap<String,String> ();
+ public static io.vavr.collection.HashMap<String, String> processCmdLine(String[] args) {
+ final HashMap<String, String> map = new HashMap<String, String>();
String lastKey = null;
- for ( String arg : args )
- {
- if ( arg.startsWith ( "-" ) )
- {
- if ( lastKey != null )
- {
- map.put ( lastKey.substring(1), "" );
+ for (String arg : args) {
+ if (arg.startsWith("-")) {
+ if (lastKey != null) {
+ map.put(lastKey.substring(1), "");
}
lastKey = arg;
- }
- else
- {
- if ( lastKey != null )
- {
- map.put ( lastKey.substring(1), arg );
+ } else {
+ if (lastKey != null) {
+ map.put(lastKey.substring(1), arg);
}
lastKey = null;
}
}
- if ( lastKey != null )
- {
- map.put ( lastKey.substring(1), "" );
+ if (lastKey != null) {
+ map.put(lastKey.substring(1), "");
}
return io.vavr.collection.HashMap.ofAll(map);
}
diff --git a/src/main/java/org/onap/dcae/CollectorSchemas.java b/src/main/java/org/onap/dcae/CollectorSchemas.java
new file mode 100644
index 00000000..fc12b1f9
--- /dev/null
+++ b/src/main/java/org/onap/dcae/CollectorSchemas.java
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.s
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae;
+
+import static java.nio.file.Files.readAllBytes;
+import static java.util.stream.Collectors.toMap;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.github.fge.jackson.JsonLoader;
+import com.github.fge.jsonschema.core.exceptions.ProcessingException;
+import com.github.fge.jsonschema.main.JsonSchema;
+import com.github.fge.jsonschema.main.JsonSchemaFactory;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.AbstractMap;
+import java.util.Map;
+import org.json.JSONObject;
+import org.onap.dcae.restapi.VesRestController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class CollectorSchemas {
+
+ private static final Logger LOG = (Logger) LoggerFactory.getLogger(VesRestController.class);
+
+ @Autowired
+ private ApplicationSettings collectorProperties;
+
+ //refactor is needed in next iteration
+ public Map<String, JsonSchema> getJSONSchemasMap(String version) {
+ JSONObject jsonObject = collectorProperties.jsonSchema();
+ Map<String, JsonSchema> schemas = jsonObject.toMap().entrySet().stream().map(
+ versionToFilePath -> {
+ try {
+ String schemaContent = new String(
+ readAllBytes(Paths.get(versionToFilePath.getValue().toString())));
+ JsonNode schemaNode = JsonLoader.fromString(schemaContent);
+ JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode);
+ return new AbstractMap.SimpleEntry<>(versionToFilePath.getKey(), schema);
+ } catch (IOException | ProcessingException e) {
+ LOG.error("Could not read schema from path: " + versionToFilePath.getValue(), e);
+ throw new RuntimeException(
+ "Could not read schema from path: " + versionToFilePath.getValue(), e);
+ }
+ }
+ ).collect(toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
+ if (schemas.get(version) == null && collectorProperties.eventTransformingEnabled()) {
+ LOG.error(String.format("Missing necessary %s JSON schema", version));
+ throw new RuntimeException(String.format("Missing necessary %s JSON schema", version));
+ }
+ return schemas;
+ }
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/SchemaValidator.java b/src/main/java/org/onap/dcae/SchemaValidator.java
new file mode 100644
index 00000000..e4b52cfb
--- /dev/null
+++ b/src/main/java/org/onap/dcae/SchemaValidator.java
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.s
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.github.fge.jackson.JsonLoader;
+import com.github.fge.jsonschema.core.exceptions.ProcessingException;
+import com.github.fge.jsonschema.core.report.ProcessingMessage;
+import com.github.fge.jsonschema.core.report.ProcessingReport;
+import com.github.fge.jsonschema.main.JsonSchema;
+import com.github.fge.jsonschema.main.JsonSchemaFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class SchemaValidator {
+
+ private static final Logger log = LoggerFactory.getLogger(SchemaValidator.class);
+
+ //refactor in next iteration
+ public static String validateAgainstSchema(String jsonData, String jsonSchema) {
+ ProcessingReport report;
+ String result = "false";
+
+ try {
+ log.trace("Schema validation for event:" + jsonData);
+ JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
+ JsonNode data = JsonLoader.fromString(jsonData);
+ JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
+ JsonSchema schema = factory.getJsonSchema(schemaNode);
+ report = schema.validate(data);
+ } catch (JsonParseException e) {
+ log.error("validateAgainstSchema:JsonParseException for event:" + jsonData);
+ return e.getMessage();
+ } catch (ProcessingException e) {
+ log.error("validateAgainstSchema:Processing exception for event:" + jsonData);
+ return e.getMessage();
+ } catch (IOException e) {
+ log.error(
+ "validateAgainstSchema:IO exception; something went wrong trying to read json data for event:" + jsonData);
+ return e.getMessage();
+ }
+ if (report != null) {
+ for (ProcessingMessage pm : report) {
+ log.trace("Processing Message: " + pm.getMessage());
+ }
+ result = String.valueOf(report.isSuccess());
+ }
+ try {
+ log.debug("Validation Result:" + result + " Validation report:" + report);
+ } catch (NullPointerException e) {
+ log.error("validateAgainstSchema:NullpointerException on report");
+ }
+ return result;
+ }
+}
diff --git a/src/main/java/org/onap/dcae/VesApplication.java b/src/main/java/org/onap/dcae/VesApplication.java
new file mode 100644
index 00000000..86b8ccb0
--- /dev/null
+++ b/src/main/java/org/onap/dcae/VesApplication.java
@@ -0,0 +1,110 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae;
+
+import io.vavr.collection.Map;
+import org.json.JSONObject;
+import org.onap.dcae.commonFunction.EventProcessor;
+import org.onap.dcae.commonFunction.event.publishing.DMaaPConfigurationParser;
+import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
+import org.onap.dcae.commonFunction.event.publishing.PublisherConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
+import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Lazy;
+
+import java.nio.file.Paths;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+@SpringBootApplication
+@EnableAutoConfiguration(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class})
+public class VesApplication {
+
+ private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
+ private static final Logger incomingRequestsLogger = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");
+ private static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
+ private static final Logger errorLog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");
+ private static final int MAX_THREADS = 20;
+ public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
+ private static ApplicationSettings properties;
+
+ public static void main(String[] args) {
+ SpringApplication app = new SpringApplication(VesApplication.class);
+
+ properties = new ApplicationSettings(args, CLIUtils::processCmdLine);
+
+ fProcessingInputQueue = new LinkedBlockingQueue<>(properties.maximumAllowedQueuedEvents());
+
+ app.setAddCommandLineProperties(true);
+ app.run();
+
+ EventProcessor ep = new EventProcessor(EventPublisher.createPublisher(oplog, getDmapConfig()), properties);
+
+ ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);
+ for (int i = 0; i < MAX_THREADS; ++i) {
+ executor.execute(ep);
+ }
+ }
+
+
+ private static Map<String, PublisherConfig> getDmapConfig() {
+ return DMaaPConfigurationParser.
+ parseToDomainMapping(Paths.get(properties.cambriaConfigurationFileLocation())).get();
+ }
+
+ @Bean
+ @Lazy
+ public ApplicationSettings applicationSettings() {
+ return properties;
+ }
+
+ @Bean
+ @Qualifier("incomingRequestsLogger")
+ public Logger incomingRequestsLogger() {
+ return incomingRequestsLogger;
+ }
+
+ @Bean
+ @Qualifier("metriclog")
+ public Logger incomingRequestsMetricsLogger() {
+ return metriclog;
+ }
+
+ @Bean
+ @Qualifier("errorLog")
+ public Logger errorLogger() {
+ return errorLog;
+ }
+
+ @Bean
+ public LinkedBlockingQueue<JSONObject> inputQueue() {
+ return fProcessingInputQueue;
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java
index 97d73ddd..7be45b0c 100644
--- a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java
+++ b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java
@@ -19,16 +19,17 @@
*/
package org.onap.dcae.commonFunction;
-import static io.vavr.API.Set;
-
import io.vavr.collection.List;
import io.vavr.collection.Set;
import io.vavr.control.Option;
-import java.util.stream.StreamSupport;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
+import java.util.stream.StreamSupport;
+
+import static io.vavr.API.Set;
+
/**
* This class is a wrapper for 2 most used entities of org.json lib: JSONArray and JSONObject and comprises utility
* methods for fast access of json structures without need to explicitly coerce between them. While using this, bear in
@@ -109,5 +110,4 @@ public class AnyNode {
return (JSONObject) this.obj;
}
-
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
deleted file mode 100644
index 36713aa4..00000000
--- a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.commonFunction;
-
-import com.att.nsa.apiServer.ApiServer;
-import com.att.nsa.apiServer.ApiServerConnector;
-import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.github.fge.jackson.JsonLoader;
-import com.github.fge.jsonschema.core.exceptions.ProcessingException;
-import com.github.fge.jsonschema.core.report.ProcessingMessage;
-import com.github.fge.jsonschema.core.report.ProcessingReport;
-import com.github.fge.jsonschema.main.JsonSchema;
-import com.github.fge.jsonschema.main.JsonSchemaFactory;
-import org.apache.catalina.LifecycleException;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.onap.dcae.ApplicationSettings;
-import org.onap.dcae.CLIUtils;
-import org.onap.dcae.commonFunction.event.publishing.DMaaPConfigurationParser;
-import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
-import org.onap.dcae.restapi.RestfulCollectorServlet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class CommonStartup extends NsaBaseEndpoint implements Runnable {
-
- private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
- public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");
- static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
- public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");
-
- static int maxQueueEvent = 1024 * 4;
- public static boolean schemaValidatorflag = false;
- public static boolean authflag = false;
- static boolean eventTransformFlag = true;
- public static JSONObject schemaFileJson;
- static String cambriaConfigFile;
- public static io.vavr.collection.Map<String , String [] > streamID;
-
- static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
- private static ApiServer fTomcatServer = null;
- private static final Logger log = LoggerFactory.getLogger(CommonStartup.class);
-
- private CommonStartup(ApplicationSettings settings) throws loadException, IOException, rrNvReadable.missingReqdSetting {
- final List<ApiServerConnector> connectors = new LinkedList<>();
-
- if (!settings.authorizationEnabled()) {
- connectors.add(new ApiServerConnector.Builder(settings.httpPort()).secure(false).build());
- }
-
- final int securePort = settings.httpsPort();
- final String keystoreFile = settings.keystoreFileLocation();
- final String keystorePasswordFile = settings.keystorePasswordFileLocation();
- final String keyAlias = settings.keystoreAlias();
-
- if (settings.authorizationEnabled()) {
- String keystorePassword = readFile(keystorePasswordFile);
- connectors.add(new ApiServerConnector.Builder(securePort).secure(true)
- .keystorePassword(keystorePassword).keystoreFile(keystoreFile).keyAlias(keyAlias).build());
-
- }
-
- schemaValidatorflag = settings.jsonSchemaValidationEnabled();
- maxQueueEvent = settings.maximumAllowedQueuedEvents();
- if (schemaValidatorflag) {
- schemaFileJson = settings.jsonSchema();
-
- }
- authflag = settings.authorizationEnabled();
- cambriaConfigFile = settings.cambriaConfigurationFileLocation();
- streamID = settings.dMaaPStreamsMapping();
- eventTransformFlag = settings.eventTransformingEnabled();
-
- fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true)
- .name("collector").build();
- }
-
- public static void main(String[] args) {
- try {
-
- fProcessingInputQueue = new LinkedBlockingQueue<>(CommonStartup.maxQueueEvent);
-
- VESLogger.setUpEcompLogging();
-
- CommonStartup cs = new CommonStartup(new ApplicationSettings(args, CLIUtils::processCmdLine));
-
- Thread commonStartupThread = new Thread(cs);
- commonStartupThread.start();
-
- EventProcessor ep = new EventProcessor(EventPublisher.createPublisher(oplog,
- DMaaPConfigurationParser
- .parseToDomainMapping(Paths.get(cambriaConfigFile))
- .get()));
- ExecutorService executor = Executors.newFixedThreadPool(20);
- for (int i = 0; i < 20; ++i) {
- executor.execute(ep);
- }
- } catch (Exception e) {
- CommonStartup.eplog.error("Fatal error during application startup", e);
- throw new RuntimeException(e);
- }
- }
-
- public void run() {
- try {
- fTomcatServer.start();
- fTomcatServer.await();
- } catch (LifecycleException | IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static class QueueFullException extends Exception {
-
- private static final long serialVersionUID = 1L;
- }
-
- public static void handleEvents(JSONArray a) throws QueueFullException, JSONException {
- CommonStartup.metriclog.info("EVENT_PUBLISH_START");
- for (int i = 0; i < a.length(); i++) {
- if (!fProcessingInputQueue.offer(a.getJSONObject(i))) {
- throw new QueueFullException();
- }
- }
- log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
- CommonStartup.metriclog.info("EVENT_PUBLISH_END");
- }
-
- private static String readFile(String path) throws IOException {
- byte[] encoded = Files.readAllBytes(Paths.get(path));
- String pwd = new String(encoded);
- return pwd.substring(0, pwd.length() - 1);
- }
-
- public static String validateAgainstSchema(String jsonData, String jsonSchema) {
- ProcessingReport report;
- String result = "false";
-
- try {
- log.trace("Schema validation for event:" + jsonData);
- JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
- JsonNode data = JsonLoader.fromString(jsonData);
- JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
- JsonSchema schema = factory.getJsonSchema(schemaNode);
- report = schema.validate(data);
- } catch (JsonParseException e) {
- log.error("validateAgainstSchema:JsonParseException for event:" + jsonData);
- return e.getMessage();
- } catch (ProcessingException e) {
- log.error("validateAgainstSchema:Processing exception for event:" + jsonData);
- return e.getMessage();
- } catch (IOException e) {
- log.error(
- "validateAgainstSchema:IO exception; something went wrong trying to read json data for event:" + jsonData);
- return e.getMessage();
- }
- if (report != null) {
- for (ProcessingMessage pm : report) {
- log.trace("Processing Message: " + pm.getMessage());
- }
- result = String.valueOf(report.isSuccess());
- }
- try {
- log.debug("Validation Result:" + result + " Validation report:" + report);
- } catch (NullPointerException e) {
- log.error("validateAgainstSchema:NullpointerException on report");
- }
- return result;
- }
-
-
-}
diff --git a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java
index a6de0fc8..e3d59098 100644
--- a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java
+++ b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java
@@ -8,9 +8,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,483 +21,483 @@
package org.onap.dcae.commonFunction;
-import java.text.DecimalFormat;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConfigProcessors {
-
- private static final Logger log = LoggerFactory.getLogger(ConfigProcessors.class);
- private static final String FIELD = "field";
- private static final String OLD_FIELD = "oldField";
- private static final String FILTER = "filter";
- private static final String VALUE = "value";
- private static final String REGEX = "\\[\\]";
- private static final String OBJECT_NOT_FOUND = "ObjectNotFound";
- private static final String FILTER_NOT_MET = "Filter not met";
- private static final String COMP_FALSE = "==false";
-
- private final JSONObject event;
-
- public ConfigProcessors(JSONObject eventJson) {
- event = eventJson;
- }
-
- public void getValue(JSONObject jsonObject) {
-
- final String field = jsonObject.getString(FIELD);
- final JSONObject filter = jsonObject.optJSONObject(FILTER);
-
- if (filter == null || isFilterMet(filter)) {
- getEventObjectVal(field);
- } else
- log.info(FILTER_NOT_MET);
- }
+import java.text.DecimalFormat;
+public class ConfigProcessors {
- public void setValue(JSONObject jsonObject) {
- final String field = jsonObject.getString(FIELD);
- final String value = jsonObject.getString(VALUE);
- final JSONObject filter = jsonObject.optJSONObject(FILTER);
- if (filter == null || isFilterMet(filter)) {
- setEventObjectVal(field, value);
- } else
- log.info(FILTER_NOT_MET);
- }
+ private static final Logger log = LoggerFactory.getLogger(ConfigProcessors.class);
+ private static final String FIELD = "field";
+ private static final String OLD_FIELD = "oldField";
+ private static final String FILTER = "filter";
+ private static final String VALUE = "value";
+ private static final String REGEX = "\\[\\]";
+ private static final String OBJECT_NOT_FOUND = "ObjectNotFound";
+ private static final String FILTER_NOT_MET = "Filter not met";
+ private static final String COMP_FALSE = "==false";
+ private final JSONObject event;
+ public ConfigProcessors(JSONObject eventJson) {
+ event = eventJson;
+ }
- private String evaluate(String str) {
- String value = str;
- if (str.startsWith("$")) {
- value = (String) getEventObjectVal(str.substring(1));
+ public void getValue(JSONObject jsonObject) {
- }
- return value;
- }
+ final String field = jsonObject.getString(FIELD);
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ if (filter == null || isFilterMet(filter)) {
+ getEventObjectVal(field);
+ } else
+ log.info(FILTER_NOT_MET);
+ }
- public void suppressEvent(JSONObject jsonObject) {
- final JSONObject filter = jsonObject.optJSONObject(FILTER);
- if (filter == null || isFilterMet(filter)) {
- setEventObjectVal("suppressEvent", "true");
- } else
- log.info(FILTER_NOT_MET);
- }
+ public void setValue(JSONObject jsonObject) {
+ final String field = jsonObject.getString(FIELD);
+ final String value = jsonObject.getString(VALUE);
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ if (filter == null || isFilterMet(filter)) {
+ setEventObjectVal(field, value);
+ } else
+ log.info(FILTER_NOT_MET);
+ }
- public void addAttribute(JSONObject jsonObject) {
+ private String evaluate(String str) {
+ String value = str;
+ if (str.startsWith("$")) {
+ value = (String) getEventObjectVal(str.substring(1));
+
+ }
+ return value;
+ }
- final String field = jsonObject.getString(FIELD);
- final String value = evaluate(jsonObject.getString(VALUE));
- final JSONObject filter = jsonObject.optJSONObject(FILTER);
- final String fieldType = jsonObject.optString("fieldType", "string").toLowerCase();
- if (filter == null || isFilterMet(filter)) {
- setEventObjectVal(field, value, fieldType);
- } else
- log.info(FILTER_NOT_MET);
- }
+ public void suppressEvent(JSONObject jsonObject) {
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ if (filter == null || isFilterMet(filter)) {
+ setEventObjectVal("suppressEvent", "true");
+ } else
+ log.info(FILTER_NOT_MET);
+ }
- public void updateAttribute(JSONObject jsonObject) {
- final String field = jsonObject.getString(FIELD);
- final String value = evaluate(jsonObject.getString(VALUE));
- final JSONObject filter = jsonObject.optJSONObject(FILTER);
- if (filter == null || isFilterMet(filter)) {
- setEventObjectVal(field, value);
- } else
- log.info(FILTER_NOT_MET);
- }
+ public void addAttribute(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ final String value = evaluate(jsonObject.getString(VALUE));
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ final String fieldType = jsonObject.optString("fieldType", "string").toLowerCase();
+
+ if (filter == null || isFilterMet(filter)) {
+ setEventObjectVal(field, value, fieldType);
+ } else
+ log.info(FILTER_NOT_MET);
+ }
- public void removeAttribute(JSONObject jsonObject) {
-
- final String field = jsonObject.getString(FIELD);
- final JSONObject filter = jsonObject.optJSONObject(FILTER);
-
- if (filter == null || isFilterMet(filter)) {
- removeEventKey(field);
- } else
- log.info(FILTER_NOT_MET);
- }
-
-
- private void renameArrayInArray(JSONObject jsonObject) // map
- {
- log.info("renameArrayInArray");
- final String field = jsonObject.getString(FIELD);
- final String oldField = jsonObject.getString(OLD_FIELD);
- final JSONObject filter = jsonObject.optJSONObject(FILTER);
-
- if (filter == null || isFilterMet(filter)) {
-
- final String[] fsplit = field.split(REGEX, field.length());
- final String[] oldfsplit = oldField.split(REGEX, oldField.length());
-
- final String oldValue = getEventObjectVal(oldfsplit[0]).toString();
- if (!oldValue.equals(OBJECT_NOT_FOUND)) {
- final String oldArrayName = oldfsplit[1].substring(1);
- final String newArrayName = fsplit[1].substring(1);
- final String value = oldValue.replaceAll(oldArrayName, newArrayName);
-
- log.info("oldValue ==" + oldValue);
- log.info("value ==" + value);
- JSONArray ja = new JSONArray(value);
- removeEventKey(oldfsplit[0]);
- setEventObjectVal(fsplit[0], ja);
- }
- } else
- log.info(FILTER_NOT_MET);
- }
-
-
- public void map(JSONObject jsonObject) {
-
- final String field = jsonObject.getString(FIELD);
- if (field.contains("[]")) {
- if (field.matches(".*\\[\\]\\..*\\[\\]"))
- renameArrayInArray(jsonObject);
- else
- mapToJArray(jsonObject);
- } else
- mapAttribute(jsonObject);
- }
-
- private String performOperation(String operation, String value) {
- log.info("performOperation");
- if ("convertMBtoKB".equals(operation)) {
- float kbValue = Float.parseFloat(value) * 1024;
- value = String.valueOf(kbValue);
- }
- return value;
- }
-
-
- public void mapAttribute(JSONObject jsonObject) {
-
- final String field = jsonObject.getString(FIELD);
- final String oldField = jsonObject.getString(OLD_FIELD);
- final JSONObject filter = jsonObject.optJSONObject(FILTER);
- final String operation = jsonObject.optString("operation");
- String value;
- if (filter == null || isFilterMet(filter)) {
-
- value = getEventObjectVal(oldField).toString();
- if (!value.equals(OBJECT_NOT_FOUND)) {
- if (operation != null && !operation.isEmpty())
- value = performOperation(operation, value);
-
- setEventObjectVal(field, value);
-
- removeEventKey(oldField);
- }
- } else
- log.info(FILTER_NOT_MET);
- }
-
-
- private void mapToJArray(JSONObject jsonObject) {
- log.info("mapToJArray");
- String field = jsonObject.getString(FIELD);
- String oldField = jsonObject.getString(OLD_FIELD);
- final JSONObject filter = jsonObject.optJSONObject(FILTER);
- final JSONObject attrMap = jsonObject.optJSONObject("attrMap");
- oldField = oldField.replaceAll(REGEX, "");
- field = field.replaceAll(REGEX, "");
-
- if (filter == null || isFilterMet(filter)) {
-
- String value = getEventObjectVal(oldField).toString();
- if (!value.equals(OBJECT_NOT_FOUND)) {
- log.info("old value ==" + value);
- // update old value based on attrMap
- if (attrMap != null) {
- // loop thru attrMap and update attribute name to new name
- for (String key : attrMap.keySet()) {
- value = value.replaceAll(key, attrMap.getString(key));
- }
- }
-
- log.info("new value ==" + value);
- char c = value.charAt(0);
- if (c != '[') {
- // oldfield is JsonObject
- JSONObject valueJO = new JSONObject(value);
- // if the array already exists
- String existingValue = getEventObjectVal(field).toString();
- if (!existingValue.equals(OBJECT_NOT_FOUND)) {
- JSONArray ja = new JSONArray(existingValue);
- JSONObject jo = ja.optJSONObject(0);
- if (jo != null) {
- for (String key : valueJO.keySet()) {
- jo.put(key, valueJO.get(key));
-
- }
- ja.put(0, jo);
-
- setEventObjectVal(field, ja);
- }
- } else // if new array
- setEventObjectVal(field + "[0]", new JSONObject(value), "JArray");
- } else // oldfield is jsonArray
- setEventObjectVal(field, new JSONArray(value));
-
- removeEventKey(oldField);
- }
- } else
- log.info(FILTER_NOT_MET);
- }
-
- /**
- * example - { "functionName": "concatenateValue", "args":{ "filter":
- * {"event.commonEventHeader.event":"heartbeat"},
- * FIELD:"event.commonEventHeader.eventName", "concatenate":
- * ["event.commonEventHeader.domain","event.commonEventHeader.eventType","event.commonEventHeader.alarmCondition"],
- * "delimiter":"_" } }
- **/
- public void concatenateValue(JSONObject jsonObject) {
-
- final String field = jsonObject.getString(FIELD);
- final String delimiter = jsonObject.getString("delimiter");
- final JSONArray values = jsonObject.getJSONArray("concatenate");
- final JSONObject filter = jsonObject.optJSONObject(FILTER);
- if (filter == null || isFilterMet(filter)) {
- StringBuilder value = new StringBuilder();
- for (int i = 0; i < values.length(); i++) {
-
- String tempVal = evaluate(values.getString(i));
- if (!tempVal.equals(OBJECT_NOT_FOUND)) {
- if (i == 0)
- value.append(tempVal);
- else
- value.append(delimiter).append(tempVal);
- }
- }
-
- setEventObjectVal(field, value.toString());
- } else
- log.info(FILTER_NOT_MET);
- }
-
- public void subtractValue(JSONObject jsonObject) {
-
- final String field = jsonObject.getString(FIELD);
- final JSONArray values = jsonObject.getJSONArray("subtract");
- final JSONObject filter = jsonObject.optJSONObject(FILTER);
- if (filter == null || isFilterMet(filter)) {
- float value = 0;
- for (int i = 0; i < values.length(); i++) {
- log.info(values.getString(i));
- String tempVal = evaluate(values.getString(i));
- log.info("tempVal==" + tempVal);
- if (!tempVal.equals(OBJECT_NOT_FOUND)) {
- if (i == 0)
- value = value + Float.valueOf(tempVal);
- else
- value = value - Float.valueOf(tempVal);
- }
- }
- log.info("value ==" + value);
- setEventObjectVal(field, value, "number");
- } else
- log.info(FILTER_NOT_MET);
- }
-
-
- private void removeEventKey(String field) {
- String[] keySet = field.split("\\.", field.length());
- JSONObject keySeries = event;
- for (int i = 0; i < (keySet.length - 1); i++) {
-
- keySeries = keySeries.getJSONObject(keySet[i]);
- }
-
- keySeries.remove(keySet[keySet.length - 1]);
- }
-
-
- private boolean checkFilter(JSONObject jo, String key, String logicKey) {
- String filterValue = jo.getString(key);
- if (filterValue.contains(":")) {
- String[] splitVal = filterValue.split(":");
- if ("matches".equals(splitVal[0])) {
- if ("not".equals(logicKey)) {
- if (getEventObjectVal(key).toString().matches(splitVal[1])) {
- log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
- return false;
- }
- } else {
- if (!(getEventObjectVal(key).toString().matches(splitVal[1]))) {
- log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
- return false;
- }
- }
-
- }
- if ("contains".equals(splitVal[0])) {
- if ("not".equals(logicKey)) {
- if (getEventObjectVal(key).toString().contains(splitVal[1])) {
- log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
- return false;
- }
- } else {
- if (!(getEventObjectVal(key).toString().contains(splitVal[1]))) {
- log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
- return false;
- }
- }
-
- }
- } else {
- if ("not".equals(logicKey)) {
- if (getEventObjectVal(key).toString().equals(filterValue)) {
- log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
- return false;
- }
- } else {
- if (!(getEventObjectVal(key).toString().equals(filterValue))) {
- log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
- return false;
- }
- }
- }
- return true;
- }
-
-
- public boolean isFilterMet(JSONObject jo) {
- for (String key : jo.keySet()) {
- if ("not".equals(key)) {
- JSONObject njo = jo.getJSONObject(key);
- for (String njoKey : njo.keySet()) {
- if (!checkFilter(njo, njoKey, key))
- return false;
- }
- } else {
- if (!checkFilter(jo, key, key))
- return false;
- }
- }
- return true;
- }
-
- /**
- * returns a string or JSONObject or JSONArray
- **/
- public Object getEventObjectVal(String keySeriesStr) {
- keySeriesStr = keySeriesStr.replaceAll("\\[", ".");
- keySeriesStr = keySeriesStr.replaceAll("\\]", ".");
- if (keySeriesStr.contains("..")) {
- keySeriesStr = keySeriesStr.replaceAll("\\.\\.", ".");
- }
-
- if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() - 1)
- keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1);
- String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length());
- Object keySeriesObj = event;
- for (String aKeySet : keySet) {
- if (keySeriesObj != null) {
- if (keySeriesObj instanceof String) {
-
- log.info("STRING==" + keySeriesObj);
- } else if (keySeriesObj instanceof JSONArray) {
- keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(aKeySet));
-
- } else if (keySeriesObj instanceof JSONObject) {
- keySeriesObj = ((JSONObject) keySeriesObj).opt(aKeySet);
-
- } else {
- log.info("unknown object==" + keySeriesObj);
- }
- }
- }
-
- if (keySeriesObj == null)
- return OBJECT_NOT_FOUND;
- return keySeriesObj;
- }
-
- public void setEventObjectVal(String keySeriesStr, Object value) {
- setEventObjectVal(keySeriesStr, value, "string");
- }
-
- /**
- * returns a string or JSONObject or JSONArray
- **/
- public void setEventObjectVal(String keySeriesStr, Object value, String fieldType) {
- keySeriesStr = keySeriesStr.replaceAll("\\[", ".");
- keySeriesStr = keySeriesStr.replaceAll("\\]", ".");
- if (keySeriesStr.contains("..")) {
- keySeriesStr = keySeriesStr.replaceAll("\\.\\.", ".");
- }
- log.info("fieldType==" + fieldType);
-
- if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() - 1)
- keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1);
- String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length());
- Object keySeriesObj = event;
- for (int i = 0; i < (keySet.length - 1); i++) {
-
- if (keySeriesObj instanceof JSONArray) {
-
- if (((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])) == null) // if
- // the
- // object
- // is
- // not
- // there
- // then
- // add
- // it
- {
- log.info("Object is null, must add it");
- if (keySet[i + 1].matches("[0-9]*")) // if index then array
- ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONArray());
- else
- ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONObject());
- }
- keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i]));
-
- } else if (keySeriesObj instanceof JSONObject) {
- if (((JSONObject) keySeriesObj).opt(keySet[i]) == null) // if
- // the
- // object
- // is
- // not
- // there
- // then
- // add
- // it
- {
- if (keySet[i + 1].matches("[0-9]*")) // if index then array
- ((JSONObject) keySeriesObj).put(keySet[i], new JSONArray());
- else
- ((JSONObject) keySeriesObj).put(keySet[i], new JSONObject());
- log.info("Object is null, must add it");
- }
- keySeriesObj = ((JSONObject) keySeriesObj).opt(keySet[i]);
- } else {
- log.info("unknown object==" + keySeriesObj);
- }
- }
- if ("number".equals(fieldType)) {
- DecimalFormat df = new DecimalFormat("#.0");
- if (value instanceof String)
- ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1],
- Float.valueOf(df.format(Float.valueOf((String) value))));
- else
- ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], Float.valueOf(df.format(value)));
- } else if ("integer".equals(fieldType) && value instanceof String)
- ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], Integer.valueOf((String) value));
- else if ("JArray".equals(fieldType))
- ((JSONArray) keySeriesObj).put(value);
- else
- ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], value);
-
- }
+ public void updateAttribute(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ final String value = evaluate(jsonObject.getString(VALUE));
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ if (filter == null || isFilterMet(filter)) {
+ setEventObjectVal(field, value);
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+
+ public void removeAttribute(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+
+ if (filter == null || isFilterMet(filter)) {
+ removeEventKey(field);
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+
+ private void renameArrayInArray(JSONObject jsonObject) // map
+ {
+ log.info("renameArrayInArray");
+ final String field = jsonObject.getString(FIELD);
+ final String oldField = jsonObject.getString(OLD_FIELD);
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+
+ if (filter == null || isFilterMet(filter)) {
+
+ final String[] fsplit = field.split(REGEX, field.length());
+ final String[] oldfsplit = oldField.split(REGEX, oldField.length());
+
+ final String oldValue = getEventObjectVal(oldfsplit[0]).toString();
+ if (!oldValue.equals(OBJECT_NOT_FOUND)) {
+ final String oldArrayName = oldfsplit[1].substring(1);
+ final String newArrayName = fsplit[1].substring(1);
+ final String value = oldValue.replaceAll(oldArrayName, newArrayName);
+
+ log.info("oldValue ==" + oldValue);
+ log.info("value ==" + value);
+ JSONArray ja = new JSONArray(value);
+ removeEventKey(oldfsplit[0]);
+ setEventObjectVal(fsplit[0], ja);
+ }
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+
+ public void map(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ if (field.contains("[]")) {
+ if (field.matches(".*\\[\\]\\..*\\[\\]"))
+ renameArrayInArray(jsonObject);
+ else
+ mapToJArray(jsonObject);
+ } else
+ mapAttribute(jsonObject);
+ }
+
+ private String performOperation(String operation, String value) {
+ log.info("performOperation");
+ if ("convertMBtoKB".equals(operation)) {
+ float kbValue = Float.parseFloat(value) * 1024;
+ value = String.valueOf(kbValue);
+ }
+ return value;
+ }
+
+
+ public void mapAttribute(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ final String oldField = jsonObject.getString(OLD_FIELD);
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ final String operation = jsonObject.optString("operation");
+ String value;
+ if (filter == null || isFilterMet(filter)) {
+
+ value = getEventObjectVal(oldField).toString();
+ if (!value.equals(OBJECT_NOT_FOUND)) {
+ if (operation != null && !operation.isEmpty())
+ value = performOperation(operation, value);
+
+ setEventObjectVal(field, value);
+
+ removeEventKey(oldField);
+ }
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+
+ private void mapToJArray(JSONObject jsonObject) {
+ log.info("mapToJArray");
+ String field = jsonObject.getString(FIELD);
+ String oldField = jsonObject.getString(OLD_FIELD);
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ final JSONObject attrMap = jsonObject.optJSONObject("attrMap");
+ oldField = oldField.replaceAll(REGEX, "");
+ field = field.replaceAll(REGEX, "");
+
+ if (filter == null || isFilterMet(filter)) {
+
+ String value = getEventObjectVal(oldField).toString();
+ if (!value.equals(OBJECT_NOT_FOUND)) {
+ log.info("old value ==" + value);
+ // update old value based on attrMap
+ if (attrMap != null) {
+ // loop thru attrMap and update attribute name to new name
+ for (String key : attrMap.keySet()) {
+ value = value.replaceAll(key, attrMap.getString(key));
+ }
+ }
+
+ log.info("new value ==" + value);
+ char c = value.charAt(0);
+ if (c != '[') {
+ // oldfield is JsonObject
+ JSONObject valueJO = new JSONObject(value);
+ // if the array already exists
+ String existingValue = getEventObjectVal(field).toString();
+ if (!existingValue.equals(OBJECT_NOT_FOUND)) {
+ JSONArray ja = new JSONArray(existingValue);
+ JSONObject jo = ja.optJSONObject(0);
+ if (jo != null) {
+ for (String key : valueJO.keySet()) {
+ jo.put(key, valueJO.get(key));
+
+ }
+ ja.put(0, jo);
+
+ setEventObjectVal(field, ja);
+ }
+ } else // if new array
+ setEventObjectVal(field + "[0]", new JSONObject(value), "JArray");
+ } else // oldfield is jsonArray
+ setEventObjectVal(field, new JSONArray(value));
+
+ removeEventKey(oldField);
+ }
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+ /**
+ * example - { "functionName": "concatenateValue", "args":{ "filter":
+ * {"event.commonEventHeader.event":"heartbeat"},
+ * FIELD:"event.commonEventHeader.eventName", "concatenate":
+ * ["event.commonEventHeader.domain","event.commonEventHeader.eventType","event.commonEventHeader.alarmCondition"],
+ * "delimiter":"_" } }
+ **/
+ public void concatenateValue(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ final String delimiter = jsonObject.getString("delimiter");
+ final JSONArray values = jsonObject.getJSONArray("concatenate");
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ if (filter == null || isFilterMet(filter)) {
+ StringBuilder value = new StringBuilder();
+ for (int i = 0; i < values.length(); i++) {
+
+ String tempVal = evaluate(values.getString(i));
+ if (!tempVal.equals(OBJECT_NOT_FOUND)) {
+ if (i == 0)
+ value.append(tempVal);
+ else
+ value.append(delimiter).append(tempVal);
+ }
+ }
+
+ setEventObjectVal(field, value.toString());
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+ public void subtractValue(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ final JSONArray values = jsonObject.getJSONArray("subtract");
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ if (filter == null || isFilterMet(filter)) {
+ float value = 0;
+ for (int i = 0; i < values.length(); i++) {
+ log.info(values.getString(i));
+ String tempVal = evaluate(values.getString(i));
+ log.info("tempVal==" + tempVal);
+ if (!tempVal.equals(OBJECT_NOT_FOUND)) {
+ if (i == 0)
+ value = value + Float.valueOf(tempVal);
+ else
+ value = value - Float.valueOf(tempVal);
+ }
+ }
+ log.info("value ==" + value);
+ setEventObjectVal(field, value, "number");
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+
+ private void removeEventKey(String field) {
+ String[] keySet = field.split("\\.", field.length());
+ JSONObject keySeries = event;
+ for (int i = 0; i < (keySet.length - 1); i++) {
+
+ keySeries = keySeries.getJSONObject(keySet[i]);
+ }
+
+ keySeries.remove(keySet[keySet.length - 1]);
+ }
+
+
+ private boolean checkFilter(JSONObject jo, String key, String logicKey) {
+ String filterValue = jo.getString(key);
+ if (filterValue.contains(":")) {
+ String[] splitVal = filterValue.split(":");
+ if ("matches".equals(splitVal[0])) {
+ if ("not".equals(logicKey)) {
+ if (getEventObjectVal(key).toString().matches(splitVal[1])) {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
+ return false;
+ }
+ } else {
+ if (!(getEventObjectVal(key).toString().matches(splitVal[1]))) {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
+ return false;
+ }
+ }
+
+ }
+ if ("contains".equals(splitVal[0])) {
+ if ("not".equals(logicKey)) {
+ if (getEventObjectVal(key).toString().contains(splitVal[1])) {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
+ return false;
+ }
+ } else {
+ if (!(getEventObjectVal(key).toString().contains(splitVal[1]))) {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
+ return false;
+ }
+ }
+
+ }
+ } else {
+ if ("not".equals(logicKey)) {
+ if (getEventObjectVal(key).toString().equals(filterValue)) {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
+ return false;
+ }
+ } else {
+ if (!(getEventObjectVal(key).toString().equals(filterValue))) {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+
+ public boolean isFilterMet(JSONObject jo) {
+ for (String key : jo.keySet()) {
+ if ("not".equals(key)) {
+ JSONObject njo = jo.getJSONObject(key);
+ for (String njoKey : njo.keySet()) {
+ if (!checkFilter(njo, njoKey, key))
+ return false;
+ }
+ } else {
+ if (!checkFilter(jo, key, key))
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * returns a string or JSONObject or JSONArray
+ **/
+ public Object getEventObjectVal(String keySeriesStr) {
+ keySeriesStr = keySeriesStr.replaceAll("\\[", ".");
+ keySeriesStr = keySeriesStr.replaceAll("\\]", ".");
+ if (keySeriesStr.contains("..")) {
+ keySeriesStr = keySeriesStr.replaceAll("\\.\\.", ".");
+ }
+
+ if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() - 1)
+ keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1);
+ String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length());
+ Object keySeriesObj = event;
+ for (String aKeySet : keySet) {
+ if (keySeriesObj != null) {
+ if (keySeriesObj instanceof String) {
+
+ log.info("STRING==" + keySeriesObj);
+ } else if (keySeriesObj instanceof JSONArray) {
+ keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(aKeySet));
+
+ } else if (keySeriesObj instanceof JSONObject) {
+ keySeriesObj = ((JSONObject) keySeriesObj).opt(aKeySet);
+
+ } else {
+ log.info("unknown object==" + keySeriesObj);
+ }
+ }
+ }
+
+ if (keySeriesObj == null)
+ return OBJECT_NOT_FOUND;
+ return keySeriesObj;
+ }
+
+ public void setEventObjectVal(String keySeriesStr, Object value) {
+ setEventObjectVal(keySeriesStr, value, "string");
+ }
+
+ /**
+ * returns a string or JSONObject or JSONArray
+ **/
+ public void setEventObjectVal(String keySeriesStr, Object value, String fieldType) {
+ keySeriesStr = keySeriesStr.replaceAll("\\[", ".");
+ keySeriesStr = keySeriesStr.replaceAll("\\]", ".");
+ if (keySeriesStr.contains("..")) {
+ keySeriesStr = keySeriesStr.replaceAll("\\.\\.", ".");
+ }
+ log.info("fieldType==" + fieldType);
+
+ if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() - 1)
+ keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1);
+ String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length());
+ Object keySeriesObj = event;
+ for (int i = 0; i < (keySet.length - 1); i++) {
+
+ if (keySeriesObj instanceof JSONArray) {
+
+ if (((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])) == null) // if
+ // the
+ // object
+ // is
+ // not
+ // there
+ // then
+ // add
+ // it
+ {
+ log.info("Object is null, must add it");
+ if (keySet[i + 1].matches("[0-9]*")) // if index then array
+ ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONArray());
+ else
+ ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONObject());
+ }
+ keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i]));
+
+ } else if (keySeriesObj instanceof JSONObject) {
+ if (((JSONObject) keySeriesObj).opt(keySet[i]) == null) // if
+ // the
+ // object
+ // is
+ // not
+ // there
+ // then
+ // add
+ // it
+ {
+ if (keySet[i + 1].matches("[0-9]*")) // if index then array
+ ((JSONObject) keySeriesObj).put(keySet[i], new JSONArray());
+ else
+ ((JSONObject) keySeriesObj).put(keySet[i], new JSONObject());
+ log.info("Object is null, must add it");
+ }
+ keySeriesObj = ((JSONObject) keySeriesObj).opt(keySet[i]);
+ } else {
+ log.info("unknown object==" + keySeriesObj);
+ }
+ }
+ if ("number".equals(fieldType)) {
+ DecimalFormat df = new DecimalFormat("#.0");
+ if (value instanceof String)
+ ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1],
+ Float.valueOf(df.format(Float.valueOf((String) value))));
+ else
+ ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], Float.valueOf(df.format(value)));
+ } else if ("integer".equals(fieldType) && value instanceof String)
+ ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], Integer.valueOf((String) value));
+ else if ("JArray".equals(fieldType))
+ ((JSONArray) keySeriesObj).put(value);
+ else
+ ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], value);
+
+ }
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
index a57ea3f0..7d27399d 100644
--- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
+++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
@@ -25,7 +25,10 @@ import com.att.nsa.logging.LoggingContext;
import com.att.nsa.logging.log4j.EcompFields;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
+import io.vavr.collection.Map;
import org.json.JSONObject;
+import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.VesApplication;
import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,37 +38,38 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-class EventProcessor implements Runnable {
+public class EventProcessor implements Runnable {
+ static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {
+ }.getType();
private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
private static final String EVENT_LITERAL = "event";
private static final String COMMON_EVENT_HEADER = "commonEventHeader";
- static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType();
private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
- static Map<String, String[]> streamidHash = new HashMap<>();
public JSONObject event;
private EventPublisher eventPublisher;
+ private Map<String, String[]> streamidHash;
+ private ApplicationSettings properties;
- public EventProcessor(EventPublisher eventPublisher) {
+
+ public EventProcessor(EventPublisher eventPublisher, ApplicationSettings properties) {
this.eventPublisher = eventPublisher;
- streamidHash = CommonStartup.streamID.toJavaMap();
+ this.properties = properties;
+ this.streamidHash = properties.dMaaPStreamsMapping();
}
@Override
public void run() {
try {
while (true) {
- event = CommonStartup.fProcessingInputQueue.take();
+ event = VesApplication.fProcessingInputQueue.take();
// As long as the producer is running we remove elements from
// the queue.
- log.info("QueueSize:" + CommonStartup.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
+ log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
String uuid = event.get("VESuniqueId").toString();
LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
@@ -73,14 +77,12 @@ class EventProcessor implements Runnable {
String domain = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain");
log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + domain);
- String[] streamIdList = streamidHash.get(domain);
- log.debug("streamIdList:" + Arrays.toString(streamIdList));
-
- if (streamIdList.length == 0) {
- log.error("No StreamID defined for publish - Message dropped" + event);
- } else {
- sendEventsToStreams(streamIdList);
- }
+ streamidHash.get(domain)
+ .onEmpty(() -> {
+ log.error("No StreamID defined for publish - Message dropped" + event);
+ }).forEach(streamIds -> {
+ sendEventsToStreams(streamIds);
+ });
log.debug("Message published" + event);
}
} catch (InterruptedException e) {
@@ -93,7 +95,7 @@ class EventProcessor implements Runnable {
// Set collector timestamp in event payload before publish
addCurrentTimeToEvent(event);
- if (CommonStartup.eventTransformFlag) {
+ if (properties.eventTransformingEnabled()) {
// read the mapping json file
try (FileReader fr = new FileReader("./etc/eventTransform.json")) {
log.info("parse eventTransform.json");
@@ -168,5 +170,4 @@ class EventProcessor implements Runnable {
method.invoke(configProcessors, parameter);
}
}
-}
-
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/commonFunction/VESLogger.java b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java
index a967327e..2a392e81 100644
--- a/src/main/java/org/onap/dcae/commonFunction/VESLogger.java
+++ b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java
@@ -8,9 +8,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -35,128 +35,126 @@ import java.util.UUID;
public class VESLogger {
- public static final String VES_AGENT = "VES_AGENT";
- public static final String REQUEST_ID = "requestId";
- private static final String IP_ADDRESS = "127.0.0.1";
- private static final String HOST_NAME = "localhost";
-
- public static Logger auditLog;
- public static Logger metricsLog;
- public static Logger errorLog;
- public static Logger debugLog;
-
- // Common LoggingContext
- private static LoggingContext commonLC;
- // Thread-specific LoggingContext
- private static LoggingContext threadLC;
- public LoggingContext lc;
-
- /**
- * Returns the common LoggingContext instance that is the base context for
- * all subsequent instances.
- *
- * @return the common LoggingContext
- */
- public static LoggingContext getCommonLoggingContext() {
- if (commonLC == null) {
- commonLC = new LoggingContextFactory.Builder().build();
- final UUID uuid = UUID.randomUUID();
-
- commonLC.put(REQUEST_ID, uuid.toString());
- }
- return commonLC;
- }
-
- /**
- * Get a logging context for the current thread that's based on the common
- * logging context. Populate the context with context-specific values.
- *
- * @param aUuid
- * uuid for request id
- * @return a LoggingContext for the current thread
- */
- public static LoggingContext getLoggingContextForThread(UUID aUuid) {
- // note that this operation requires everything from the common context
- // to be (re)copied into the target context. That seems slow, but it
- // actually
- // helps prevent the thread from overwriting supposedly common data. It
- // also
- // should be fairly quick compared with the overhead of handling the
- // actual
- // service call.
-
- threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build();
- // Establish the request-specific UUID, as long as we are here...
- threadLC.put(REQUEST_ID, aUuid.toString());
- threadLC.put(EcompFields.kEndTimestamp, SaClock.now());
-
- return threadLC;
- }
-
- /**
- * Get a logging context for the current thread that's based on the common
- * logging context. Populate the context with context-specific values.
- *
- * @param aUuid
- * uuid for request id
- * @return a LoggingContext for the current thread
- */
- public static LoggingContext getLoggingContextForThread(String aUuid) {
- // note that this operation requires everything from the common context
- // to be (re)copied into the target context. That seems slow, but it
- // actually
- // helps prevent the thread from overwriting supposedly common data. It
- // also
- // should be fairly quick compared with the overhead of handling the
- // actual
- // service call.
-
- threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build();
- // Establish the request-specific UUID, as long as we are here...
- threadLC.put(REQUEST_ID, aUuid);
- threadLC.put("statusCode", "COMPLETE");
- threadLC.put(EcompFields.kEndTimestamp, SaClock.now());
- return threadLC;
- }
-
- public static void setUpEcompLogging() {
-
- // Create ECOMP Logger instances
- auditLog = LoggerFactory.getLogger("com.att.ecomp.audit");
- metricsLog = LoggerFactory.getLogger("com.att.ecomp.metrics");
- debugLog = LoggerFactory.getLogger("com.att.ecomp.debug");
- errorLog = LoggerFactory.getLogger("com.att.ecomp.error");
-
- final LoggingContext lc = getCommonLoggingContext();
-
- String ipAddr = IP_ADDRESS;
- String hostname = HOST_NAME;
- try {
- final InetAddress ip = InetAddress.getLocalHost();
- hostname = ip.getCanonicalHostName();
- ipAddr = ip.getHostAddress();
- } catch (UnknownHostException x) {
- Log.debug(x.getMessage());
- }
-
- lc.put("serverName", hostname);
- lc.put("serviceName", "VESCollecor");
- lc.put("statusCode", "RUNNING");
- lc.put("targetEntity", "NULL");
- lc.put("targetServiceName", "NULL");
- lc.put("server", hostname);
- lc.put("serverIpAddress", ipAddr);
-
- // instance UUID is meaningless here, so we just create a new one each
- // time the
- // server starts. One could argue each new instantiation of the service
- // should
- // have a new instance ID.
- lc.put("instanceUuid", "");
- lc.put("severity", "");
- lc.put(EcompFields.kEndTimestamp, SaClock.now());
- lc.put("EndTimestamp", SaClock.now());
- lc.put("partnerName", "NA");
- }
+ public static final String VES_AGENT = "VES_AGENT";
+ public static final String REQUEST_ID = "requestId";
+ private static final String IP_ADDRESS = "127.0.0.1";
+ private static final String HOST_NAME = "localhost";
+
+ public static Logger auditLog;
+ public static Logger metricsLog;
+ public static Logger errorLog;
+ public static Logger debugLog;
+
+ // Common LoggingContext
+ private static LoggingContext commonLC;
+ // Thread-specific LoggingContext
+ private static LoggingContext threadLC;
+ public LoggingContext lc;
+
+ /**
+ * Returns the common LoggingContext instance that is the base context for
+ * all subsequent instances.
+ *
+ * @return the common LoggingContext
+ */
+ public static LoggingContext getCommonLoggingContext() {
+ if (commonLC == null) {
+ commonLC = new LoggingContextFactory.Builder().build();
+ final UUID uuid = UUID.randomUUID();
+
+ commonLC.put(REQUEST_ID, uuid.toString());
+ }
+ return commonLC;
+ }
+
+ /**
+ * Get a logging context for the current thread that's based on the common
+ * logging context. Populate the context with context-specific values.
+ *
+ * @param aUuid uuid for request id
+ * @return a LoggingContext for the current thread
+ */
+ public static LoggingContext getLoggingContextForThread(UUID aUuid) {
+ // note that this operation requires everything from the common context
+ // to be (re)copied into the target context. That seems slow, but it
+ // actually
+ // helps prevent the thread from overwriting supposedly common data. It
+ // also
+ // should be fairly quick compared with the overhead of handling the
+ // actual
+ // service call.
+
+ threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build();
+ // Establish the request-specific UUID, as long as we are here...
+ threadLC.put(REQUEST_ID, aUuid.toString());
+ threadLC.put(EcompFields.kEndTimestamp, SaClock.now());
+
+ return threadLC;
+ }
+
+ /**
+ * Get a logging context for the current thread that's based on the common
+ * logging context. Populate the context with context-specific values.
+ *
+ * @param aUuid uuid for request id
+ * @return a LoggingContext for the current thread
+ */
+ public static LoggingContext getLoggingContextForThread(String aUuid) {
+ // note that this operation requires everything from the common context
+ // to be (re)copied into the target context. That seems slow, but it
+ // actually
+ // helps prevent the thread from overwriting supposedly common data. It
+ // also
+ // should be fairly quick compared with the overhead of handling the
+ // actual
+ // service call.
+
+ threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build();
+ // Establish the request-specific UUID, as long as we are here...
+ threadLC.put(REQUEST_ID, aUuid);
+ threadLC.put("statusCode", "COMPLETE");
+ threadLC.put(EcompFields.kEndTimestamp, SaClock.now());
+ return threadLC;
+ }
+
+ public static void setUpEcompLogging() {
+
+ // Create ECOMP Logger instances
+ auditLog = LoggerFactory.getLogger("com.att.ecomp.audit");
+ metricsLog = LoggerFactory.getLogger("com.att.ecomp.metrics");
+ debugLog = LoggerFactory.getLogger("com.att.ecomp.debug");
+ errorLog = LoggerFactory.getLogger("com.att.ecomp.error");
+
+ final LoggingContext lc = getCommonLoggingContext();
+
+ String ipAddr = IP_ADDRESS;
+ String hostname = HOST_NAME;
+ try {
+ final InetAddress ip = InetAddress.getLocalHost();
+ hostname = ip.getCanonicalHostName();
+ ipAddr = ip.getHostAddress();
+ } catch (UnknownHostException x) {
+ Log.debug(x.getMessage());
+ }
+
+ lc.put("serverName", hostname);
+ lc.put("serviceName", "VESCollecor");
+ lc.put("statusCode", "RUNNING");
+ lc.put("targetEntity", "NULL");
+ lc.put("targetServiceName", "NULL");
+ lc.put("server", hostname);
+ lc.put("serverIpAddress", ipAddr);
+
+ // instance UUID is meaningless here, so we just create a new one each
+ // time the
+ // server starts. One could argue each new instantiation of the service
+ // should
+ // have a new instance ID.
+ lc.put("instanceUuid", "");
+ lc.put("severity", "");
+ lc.put(EcompFields.kEndTimestamp, SaClock.now());
+ lc.put("EndTimestamp", SaClock.now());
+ lc.put("partnerName", "NA");
+ }
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java
index 5865b12c..179e8826 100644
--- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java
@@ -19,21 +19,19 @@
*/
package org.onap.dcae.commonFunction.event.publishing;
-import static io.vavr.API.List;
-import static io.vavr.API.Try;
-import static io.vavr.API.Tuple;
-import static io.vavr.API.unchecked;
-import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError;
-import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
-
import io.vavr.collection.List;
import io.vavr.collection.Map;
import io.vavr.control.Option;
import io.vavr.control.Try;
+import org.onap.dcae.commonFunction.AnyNode;
+
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
-import org.onap.dcae.commonFunction.AnyNode;
+
+import static io.vavr.API.*;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
@@ -43,23 +41,23 @@ public final class DMaaPConfigurationParser {
public static Try<Map<String, PublisherConfig>> parseToDomainMapping(Path configLocation) {
return readFromFile(configLocation)
- .flatMap(DMaaPConfigurationParser::toJSON)
- .flatMap(DMaaPConfigurationParser::toConfigMap);
+ .flatMap(DMaaPConfigurationParser::toJSON)
+ .flatMap(DMaaPConfigurationParser::toConfigMap);
}
private static Try<String> readFromFile(Path configLocation) {
return Try(() -> new String(Files.readAllBytes(configLocation)))
- .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation)));
+ .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation)));
}
private static Try<AnyNode> toJSON(String config) {
return Try(() -> AnyNode.fromString(config))
- .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config)));
+ .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config)));
}
private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) {
return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config))
- .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
+ .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
}
private static boolean usesLegacyFormat(AnyNode dMaaPConfig) {
@@ -68,40 +66,40 @@ public final class DMaaPConfigurationParser {
private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) {
return root.get("channels").toList().toMap(
- channel -> channel.get("name").toString(),
- channel -> {
- String destinationsStr = channel.getAsOption("cambria.url")
- .getOrElse(channel.getAsOption("cambria.hosts").get())
- .toString();
- String topic = channel.get("cambria.topic").toString();
- Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString);
- Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString);
- List<String> destinations = List(destinationsStr.split(","));
- return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
- });
+ channel -> channel.get("name").toString(),
+ channel -> {
+ String destinationsStr = channel.getAsOption("cambria.url")
+ .getOrElse(channel.getAsOption("cambria.hosts").get())
+ .toString();
+ String topic = channel.get("cambria.topic").toString();
+ Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString);
+ Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString);
+ List<String> destinations = List(destinationsStr.split(","));
+ return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
+ });
}
private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) {
return root.keys().toMap(
- channelName -> channelName,
- channelName -> {
- AnyNode channelConfig = root.get(channelName);
- Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString);
- Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString);
- URL topicURL = unchecked(
- () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply();
- String[] pathSegments = topicURL.getPath().substring(1).split("/");
- String topic = pathSegments[1];
- String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost();
- List<String> destinations = List(destination);
- return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
- });
+ channelName -> channelName,
+ channelName -> {
+ AnyNode channelConfig = root.get(channelName);
+ Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString);
+ Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString);
+ URL topicURL = unchecked(
+ () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply();
+ String[] pathSegments = topicURL.getPath().substring(1).split("/");
+ String topic = pathSegments[1];
+ String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost();
+ List<String> destinations = List(destination);
+ return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
+ });
}
private static PublisherConfig buildBasedOnAuth(Option<String> maybeUser, Option<String> maybePassword,
String topic, List<String> destinations) {
return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password)))
- .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2))
- .getOrElse(new PublisherConfig(destinations, topic));
+ .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2))
+ .getOrElse(new PublisherConfig(destinations, topic));
}
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java
index fd9b3ae1..a0ee3bfb 100644
--- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java
@@ -21,20 +21,21 @@
package org.onap.dcae.commonFunction.event.publishing;
-import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
-
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import com.att.nsa.clock.SaClock;
import com.att.nsa.logging.LoggingContext;
import com.att.nsa.logging.log4j.EcompFields;
import io.vavr.collection.Map;
import io.vavr.control.Try;
-import java.io.IOException;
import org.json.JSONObject;
import org.onap.dcae.commonFunction.VESLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
+
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
*/
@@ -55,9 +56,9 @@ class DMaaPEventPublisher implements EventPublisher {
public void sendEvent(JSONObject event, String domain) {
clearVesUniqueIdFromEvent(event);
publishersCache.getPublisher(domain)
- .onEmpty(() ->
- log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event)))
- .forEach(publisher -> sendEvent(event, domain, publisher));
+ .onEmpty(() ->
+ log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event)))
+ .forEach(publisher -> sendEvent(event, domain, publisher));
}
@Override
@@ -67,11 +68,11 @@ class DMaaPEventPublisher implements EventPublisher {
private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) {
Try.run(() -> uncheckedSendEvent(event, domain, publisher))
- .onFailure(exc -> closePublisher(event, domain, exc));
+ .onFailure(exc -> closePublisher(event, domain, exc));
}
private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher)
- throws IOException {
+ throws IOException {
int pendingMsgs = publisher.send("MyPartitionKey", event.toString());
if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
log.info("Pending messages count: " + pendingMsgs);
@@ -83,7 +84,7 @@ class DMaaPEventPublisher implements EventPublisher {
private void closePublisher(JSONObject event, String domain, Throwable e) {
log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
- event, domain), e);
+ event, domain), e);
publishersCache.closePublisherFor(domain);
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java
index a7865a45..489fcbf0 100644
--- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java
@@ -19,15 +19,15 @@
*/
package org.onap.dcae.commonFunction.event.publishing;
-import static io.vavr.API.Try;
-import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError;
-import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
-
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import com.att.nsa.cambria.client.CambriaClientBuilders;
import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
import io.vavr.control.Try;
+import static io.vavr.API.Try;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
+
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
*/
@@ -36,7 +36,7 @@ final class DMaaPPublishersBuilder {
@SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do")
static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) {
return Try(() -> builder(config).build())
- .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config)));
+ .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config)));
}
private static PublisherBuilder builder(PublisherConfig config) {
@@ -49,14 +49,14 @@ final class DMaaPPublishersBuilder {
private static PublisherBuilder authenticatedBuilder(PublisherConfig config) {
return unAuthenticatedBuilder(config)
- .usingHttps()
- .authenticatedByHttp(config.userName().get(), config.password().get());
+ .usingHttps()
+ .authenticatedByHttp(config.userName().get(), config.password().get());
}
private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) {
return new CambriaClientBuilders.PublisherBuilder()
- .usingHosts(config.destinations().mkString(","))
- .onTopic(config.topic())
- .logSendFailuresAfter(5);
+ .usingHosts(config.destinations().mkString(","))
+ .onTopic(config.topic())
+ .logSendFailuresAfter(5);
}
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java
index 102d2774..4cdf92da 100644
--- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java
@@ -20,24 +20,20 @@
*/
package org.onap.dcae.commonFunction.event.publishing;
-import static io.vavr.API.Option;
-import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
-
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.*;
import io.vavr.collection.Map;
import io.vavr.control.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nonnull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static io.vavr.API.Option;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
@@ -51,8 +47,8 @@ class DMaaPPublishersCache {
DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) {
this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
this.publishersCache = CacheBuilder.newBuilder()
- .removalListener(new OnPublisherRemovalListener())
- .build(new CambriaPublishersCacheLoader());
+ .removalListener(new OnPublisherRemovalListener())
+ .build(new CambriaPublishersCacheLoader());
}
DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader,
@@ -60,8 +56,8 @@ class DMaaPPublishersCache {
Map<String, PublisherConfig> dMaaPConfiguration) {
this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
this.publishersCache = CacheBuilder.newBuilder()
- .removalListener(onPublisherRemovalListener)
- .build(dMaaPPublishersCacheLoader);
+ .removalListener(onPublisherRemovalListener)
+ .build(dMaaPPublishersCacheLoader);
}
Option<CambriaBatchingPublisher> getPublisher(String streamID) {
@@ -80,9 +76,9 @@ class DMaaPPublishersCache {
synchronized void reconfigure(Map<String, PublisherConfig> newConfig) {
Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get();
Map<String, PublisherConfig> removedConfigurations = currentConfig
- .filterKeys(domain -> !newConfig.containsKey(domain));
+ .filterKeys(domain -> !newConfig.containsKey(domain));
Map<String, PublisherConfig> changedConfigurations = newConfig
- .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e)));
+ .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e)));
dMaaPConfiguration.set(newConfig);
removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1));
}
@@ -99,7 +95,7 @@ class DMaaPPublishersCache {
java.util.List<?> stuck = publisher.close(timeout, unit);
if (!stuck.isEmpty()) {
log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', "
- + "%s messages were dropped", stuck.size(), timeout, unit));
+ + "%s messages were dropped", stuck.size(), timeout, unit));
}
} catch (InterruptedException | IOException e) {
log.error("Could not close Cambria publisher, some messages might have been dropped", e);
@@ -113,11 +109,11 @@ class DMaaPPublishersCache {
@Override
public CambriaBatchingPublisher load(@Nonnull String domain) {
return dMaaPConfiguration.get()
- .get(domain)
- .toTry(() -> new RuntimeException(
- f("DMaaP configuration contains no configuration for domain: '%s'", domain)))
- .flatMap(DMaaPPublishersBuilder::buildPublisher)
- .get();
+ .get(domain)
+ .toTry(() -> new RuntimeException(
+ f("DMaaP configuration contains no configuration for domain: '%s'", domain)))
+ .flatMap(DMaaPPublishersBuilder::buildPublisher)
+ .get();
}
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java
index 4a056778..f1cbb8e5 100644
--- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java
@@ -21,6 +21,7 @@ package org.onap.dcae.commonFunction.event.publishing;
import io.vavr.collection.List;
import io.vavr.control.Option;
+
import java.util.Objects;
/**
@@ -76,9 +77,9 @@ public final class PublisherConfig {
}
PublisherConfig that = (PublisherConfig) o;
return Objects.equals(destinations, that.destinations) &&
- Objects.equals(topic, that.topic) &&
- Objects.equals(userName, that.userName) &&
- Objects.equals(password, that.password);
+ Objects.equals(topic, that.topic) &&
+ Objects.equals(userName, that.userName) &&
+ Objects.equals(password, that.password);
}
@Override
@@ -89,10 +90,10 @@ public final class PublisherConfig {
@Override
public String toString() {
return "PublisherConfig{" +
- "destinations=" + destinations +
- ", topic='" + topic + '\'' +
- ", userName='" + userName + '\'' +
- ", password='" + password + '\'' +
- '}';
+ "destinations=" + destinations +
+ ", topic='" + topic + '\'' +
+ ", userName='" + userName + '\'' +
+ ", password='" + password + '\'' +
+ '}';
}
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java
index 9bf3ef8c..78f34ff4 100644
--- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java
@@ -19,11 +19,11 @@
*/
package org.onap.dcae.commonFunction.event.publishing;
-import static io.vavr.API.$;
-
import io.vavr.API;
import io.vavr.API.Match.Case;
+import static io.vavr.API.$;
+
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
*/
diff --git a/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java b/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java
index 99e269c1..ed42a5a4 100644
--- a/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java
+++ b/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,171 +20,167 @@
package org.onap.dcae.controller;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.json.JSONArray;
import org.json.JSONObject;
import org.json.JSONTokener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
+import java.io.*;
import java.util.Map;
public class FetchDynamicConfig {
- private static final Logger log = LoggerFactory.getLogger(FetchDynamicConfig.class);
-
- public static String configFile = "/opt/app/KV-Configuration.json";
- private static String url;
- public static String retString;
- public static String retCBSString;
- private static Map<String, String> env;
+ private static final Logger log = LoggerFactory.getLogger(FetchDynamicConfig.class);
+
+ public static String configFile = "/opt/app/KV-Configuration.json";
+ public static String retString;
+ public static String retCBSString;
+ private static String url;
+ private static Map<String, String> env;
+
+ public FetchDynamicConfig() {
+ }
+
+ public static void main(String[] args) {
+ Boolean areEqual;
+ // Call consul api and identify the CBS Service address and port
+ getconsul();
+ // Construct and invoke CBS API to get application Configuration
+ getCBS();
+ // Verify if data has changed
+ areEqual = verifyConfigChange();
+ // If new config then write data returned into configFile for
+ // LoadDynamicConfig process
+ if (!areEqual) {
+ FetchDynamicConfig fc = new FetchDynamicConfig();
+ fc.writefile(retCBSString);
+ } else {
+ log.info("New config pull results identical - " + configFile + " NOT refreshed");
+ }
+ }
+
+ private static void getconsul() {
+
+ env = System.getenv();
+ for (Map.Entry<String, String> entry : env.entrySet()) {
+ log.info(entry.getKey() + ":" + entry.getValue());
+ }
+
+ if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")) {
+ // && env.containsKey("HOSTNAME")) {
+ log.info(">>>Dynamic configuration to be fetched from ConfigBindingService");
+ url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + env.get("CONFIG_BINDING_SERVICE");
+
+ retString = executecurl(url);
+
+ } else {
+ log.info(">>>Static configuration to be used");
+ }
+
+ }
+
+ public static boolean verifyConfigChange() {
+
+ boolean areEqual = false;
+ // Read current data
+ try {
+ File f = new File(configFile);
+ if (f.exists() && !f.isDirectory()) {
+
+ String jsonData = LoadDynamicConfig.readFile(configFile);
+ JSONObject jsonObject = new JSONObject(jsonData);
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ JsonNode tree1 = mapper.readTree(jsonObject.toString());
+ JsonNode tree2 = mapper.readTree(retCBSString);
+ areEqual = tree1.equals(tree2);
+ log.info("Comparison value:" + areEqual);
+ } else {
+ log.info("First time config file read: " + configFile);
+ }
+
+ } catch (IOException e) {
+ log.error("Comparison with new fetched data failed" + e.getMessage());
+
+ }
+
+ return areEqual;
+
+ }
+
+ public static void getCBS() {
+
+ env = System.getenv();
+ // consul return as array
+ JSONTokener temp = new JSONTokener(retString);
+ JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0);
+
+ String urlPart1 = null;
+ if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) {
+ urlPart1 = cbsjobj.getString("ServiceAddress") + ":" + cbsjobj.getInt("ServicePort");
+ }
+
+ log.info("CONFIG_BINDING_SERVICE DNS RESOLVED:" + urlPart1);
+
+ if (env.containsKey("HOSTNAME")) {
+ url = urlPart1 + "/service_component/" + env.get("HOSTNAME");
+ retCBSString = executecurl(url);
+ } else if (env.containsKey("SERVICE_NAME")) {
+ url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME");
+ retCBSString = executecurl(url);
+ } else {
+ log.error("Service name environment variable - HOSTNAME/SERVICE_NAME not found within container ");
+ }
+
+ }
+
+ private static String executecurl(String url) {
+
+ String[] command = {"curl", "-v", url};
+ ProcessBuilder process = new ProcessBuilder(command);
+ Process p;
+ String result = null;
+ try {
+ p = process.start();
+ InputStreamReader ipr = new InputStreamReader(p.getInputStream());
+ BufferedReader reader = new BufferedReader(ipr);
+ StringBuilder builder = new StringBuilder();
+ String line;
+
+ while ((line = reader.readLine()) != null) {
+ builder.append(line);
+ }
+ result = builder.toString();
+ log.info(result);
- public FetchDynamicConfig() {
- }
+ reader.close();
+ ipr.close();
+ } catch (IOException e) {
+ log.error("error", e);
+ e.printStackTrace();
+ }
+ return result;
- public static void main(String[] args) {
- Boolean areEqual;
- // Call consul api and identify the CBS Service address and port
- getconsul();
- // Construct and invoke CBS API to get application Configuration
- getCBS();
- // Verify if data has changed
- areEqual = verifyConfigChange();
- // If new config then write data returned into configFile for
- // LoadDynamicConfig process
- if (! areEqual) {
- FetchDynamicConfig fc = new FetchDynamicConfig();
- fc.writefile(retCBSString);
- } else {
- log.info("New config pull results identical - " + configFile + " NOT refreshed");
- }
- }
+ }
- private static void getconsul() {
-
- env = System.getenv();
- for (Map.Entry<String, String> entry : env.entrySet()) {
- log.info(entry.getKey() + ":" + entry.getValue());
- }
-
- if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")) {
- // && env.containsKey("HOSTNAME")) {
- log.info(">>>Dynamic configuration to be fetched from ConfigBindingService");
- url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + env.get("CONFIG_BINDING_SERVICE");
-
- retString = executecurl(url);
-
- } else {
- log.info(">>>Static configuration to be used");
- }
-
- }
-
- public static boolean verifyConfigChange() {
-
- boolean areEqual = false;
- // Read current data
- try {
- File f = new File(configFile);
- if (f.exists() && !f.isDirectory()) {
-
- String jsonData = LoadDynamicConfig.readFile(configFile);
- JSONObject jsonObject = new JSONObject(jsonData);
-
- ObjectMapper mapper = new ObjectMapper();
-
- JsonNode tree1 = mapper.readTree(jsonObject.toString());
- JsonNode tree2 = mapper.readTree(retCBSString);
- areEqual = tree1.equals(tree2);
- log.info("Comparison value:" + areEqual);
- } else {
- log.info("First time config file read: " + configFile);
- }
-
- } catch (IOException e) {
- log.error("Comparison with new fetched data failed" + e.getMessage());
-
- }
-
- return areEqual;
-
- }
-
- public static void getCBS() {
-
- env = System.getenv();
- // consul return as array
- JSONTokener temp = new JSONTokener(retString);
- JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0);
-
- String urlPart1 = null;
- if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) {
- urlPart1 = cbsjobj.getString("ServiceAddress") + ":" + cbsjobj.getInt("ServicePort");
- }
-
- log.info("CONFIG_BINDING_SERVICE DNS RESOLVED:" + urlPart1);
-
- if (env.containsKey("HOSTNAME")) {
- url = urlPart1 + "/service_component/" + env.get("HOSTNAME");
- retCBSString = executecurl(url);
- } else if (env.containsKey("SERVICE_NAME")) {
- url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME");
- retCBSString = executecurl(url);
- } else {
- log.error("Service name environment variable - HOSTNAME/SERVICE_NAME not found within container ");
- }
-
- }
-
- public void writefile(String retCBSString) {
- log.info("URL to fetch configuration:" + url + " Return String:" + retCBSString);
-
- String indentedretstring = (new JSONObject(retCBSString)).toString(4);
-
- try (FileWriter file = new FileWriter(FetchDynamicConfig.configFile)) {
- file.write(indentedretstring);
-
- log.info("Successfully Copied JSON Object to file " + configFile);
- } catch (IOException e) {
- log.error("Error in writing configuration into file " + configFile + retString + e.getMessage());
- e.printStackTrace();
- }
-
- }
-
- private static String executecurl(String url) {
+ public void writefile(String retCBSString) {
+ log.info("URL to fetch configuration:" + url + " Return String:" + retCBSString);
- String[] command = { "curl", "-v", url };
- ProcessBuilder process = new ProcessBuilder(command);
- Process p;
- String result = null;
- try {
- p = process.start();
- InputStreamReader ipr = new InputStreamReader(p.getInputStream());
- BufferedReader reader = new BufferedReader(ipr);
- StringBuilder builder = new StringBuilder();
- String line;
+ String indentedretstring = (new JSONObject(retCBSString)).toString(4);
- while ((line = reader.readLine()) != null) {
- builder.append(line);
- }
- result = builder.toString();
- log.info(result);
+ try (FileWriter file = new FileWriter(FetchDynamicConfig.configFile)) {
+ file.write(indentedretstring);
- reader.close();
- ipr.close();
- } catch (IOException e) {
- log.error("error", e);
- e.printStackTrace();
- }
- return result;
+ log.info("Successfully Copied JSON Object to file " + configFile);
+ } catch (IOException e) {
+ log.error("Error in writing configuration into file " + configFile + retString + e.getMessage());
+ e.printStackTrace();
+ }
- }
+ }
}
diff --git a/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java b/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java
index a8ecaba0..c1ab80c1 100644
--- a/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java
+++ b/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -35,95 +35,94 @@ import java.util.Map;
public class LoadDynamicConfig {
- private static final Logger log = LoggerFactory.getLogger(LoadDynamicConfig.class);
-
- public String propFile = "collector.properties";
- public String configFile = "/opt/app/KV-Configuration.json";
- public String dMaaPOutputFile = "./etc/DmaapConfig.json";
-
- public LoadDynamicConfig() {
-
- }
-
- public static void main(String[] args) {
- Map<String, String> env = System.getenv();
-
- // Check again to ensure new controller deployment related config
- if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")
- && env.containsKey("HOSTNAME")) {
-
- try {
-
- LoadDynamicConfig lc = new LoadDynamicConfig();
- String jsonData = readFile(lc.configFile);
- JSONObject jsonObject = new JSONObject(jsonData);
- lc.writeconfig(jsonObject);
-
-
- } catch (Exception e) {
- log.error(e.getLocalizedMessage(), e);
- e.printStackTrace();
-
- }
-
- } else {
- log.info(">>>Static configuration to be used");
- }
-
- }
-
- public void writeconfig (JSONObject jsonObject)
- {
-
- PropertiesConfiguration conf;
- try {
- conf = new PropertiesConfiguration(propFile);
-
- conf.setEncoding(null);
-
- // update properties based on consul dynamic configuration
- Iterator<?> keys = jsonObject.keys();
-
- while (keys.hasNext()) {
- String key = (String) keys.next();
- // check if any configuration is related to dmaap
- // and write into dmaapconfig.json
- if (key.startsWith("streams_publishes")) {
- // VESCollector only have publish streams
- try (FileWriter file = new FileWriter(dMaaPOutputFile)) {
- String indentedretstring=(new JSONObject(jsonObject.get(key).toString())).toString(4);
- file.write(indentedretstring);
- log.info("Successfully written JSON Object to DmaapConfig.json");
- } catch (IOException e) {
- log.info("Error in writing dmaap configuration into DmaapConfig.json", e);
- }
- } else {
- conf.setProperty(key, jsonObject.get(key).toString());
- }
-
- }
- conf.save();
- } catch (ConfigurationException e) {
- log.error(e.getLocalizedMessage(), e);
- e.printStackTrace();
- }
- }
-
- public static String readFile(String filename) {
- String result = "";
- try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
- StringBuilder sb = new StringBuilder();
- String line = br.readLine();
- while (line != null) {
- sb.append(line);
- line = br.readLine();
- }
- result = sb.toString();
- } catch (Exception e) {
- log.error(e.getLocalizedMessage(), e);
- e.printStackTrace();
- }
- return result;
- }
+ private static final Logger log = LoggerFactory.getLogger(LoadDynamicConfig.class);
+
+ public String propFile = "collector.properties";
+ public String configFile = "/opt/app/KV-Configuration.json";
+ public String dMaaPOutputFile = "./etc/DmaapConfig.json";
+
+ public LoadDynamicConfig() {
+
+ }
+
+ public static void main(String[] args) {
+ Map<String, String> env = System.getenv();
+
+ // Check again to ensure new controller deployment related config
+ if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")
+ && env.containsKey("HOSTNAME")) {
+
+ try {
+
+ LoadDynamicConfig lc = new LoadDynamicConfig();
+ String jsonData = readFile(lc.configFile);
+ JSONObject jsonObject = new JSONObject(jsonData);
+ lc.writeconfig(jsonObject);
+
+
+ } catch (Exception e) {
+ log.error(e.getLocalizedMessage(), e);
+ e.printStackTrace();
+
+ }
+
+ } else {
+ log.info(">>>Static configuration to be used");
+ }
+
+ }
+
+ public static String readFile(String filename) {
+ String result = "";
+ try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
+ StringBuilder sb = new StringBuilder();
+ String line = br.readLine();
+ while (line != null) {
+ sb.append(line);
+ line = br.readLine();
+ }
+ result = sb.toString();
+ } catch (Exception e) {
+ log.error(e.getLocalizedMessage(), e);
+ e.printStackTrace();
+ }
+ return result;
+ }
+
+ public void writeconfig(JSONObject jsonObject) {
+
+ PropertiesConfiguration conf;
+ try {
+ conf = new PropertiesConfiguration(propFile);
+
+ conf.setEncoding(null);
+
+ // update properties based on consul dynamic configuration
+ Iterator<?> keys = jsonObject.keys();
+
+ while (keys.hasNext()) {
+ String key = (String) keys.next();
+ // check if any configuration is related to dmaap
+ // and write into dmaapconfig.json
+ if (key.startsWith("streams_publishes")) {
+ // VESCollector only have publish streams
+ try (FileWriter file = new FileWriter(dMaaPOutputFile)) {
+ String indentedretstring = (new JSONObject(jsonObject.get(key).toString())).toString(4);
+ file.write(indentedretstring);
+ log.info("Successfully written JSON Object to DmaapConfig.json");
+ } catch (IOException e) {
+ log.info("Error in writing dmaap configuration into DmaapConfig.json", e);
+ }
+ } else {
+ conf.setProperty(key, jsonObject.get(key).toString());
+ }
+
+ }
+ conf.save();
+ } catch (ConfigurationException e) {
+ log.error(e.getLocalizedMessage(), e);
+ e.printStackTrace();
+ }
+ }
}
diff --git a/src/main/java/org/onap/dcae/restapi/ApiAuthInterceptor.java b/src/main/java/org/onap/dcae/restapi/ApiAuthInterceptor.java
new file mode 100644
index 00000000..864a16d7
--- /dev/null
+++ b/src/main/java/org/onap/dcae/restapi/ApiAuthInterceptor.java
@@ -0,0 +1,78 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.restapi;
+
+import io.vavr.control.Option;
+import org.onap.dcae.ApplicationSettings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.Base64;
+
+final class ApiAuthInterceptor extends HandlerInterceptorAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ApiAuthInterceptor.class);
+ private final ApplicationSettings applicationSettings;
+
+ private Logger errorLog;
+
+ ApiAuthInterceptor(ApplicationSettings applicationSettings, Logger errorLog) {
+ this.applicationSettings = applicationSettings;
+ this.errorLog = errorLog;
+ }
+
+ @Override
+ public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
+ Object handler) throws IOException {
+ if (applicationSettings.authorizationEnabled()) {
+ String authorizationHeader = request.getHeader("Authorization");
+ if (authorizationHeader == null || !isAuthorized(authorizationHeader)) {
+ response.setStatus(400);
+ errorLog.error("EVENT_RECEIPT_FAILURE: Unauthorized user");
+ response.getWriter().write(ApiException.UNAUTHORIZED_USER.toJSON().toString());
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isAuthorized(String authorizationHeader) {
+ try {
+ String encodedData = authorizationHeader.split(" ")[1];
+ String decodedData = new String(Base64.getDecoder().decode(encodedData));
+ String providedUser = decodedData.split(":")[0].trim();
+ String providedPassword = decodedData.split(":")[1].trim();
+ Option<String> maybeSavedPassword = applicationSettings.validAuthorizationCredentials().get(providedUser);
+ boolean userRegistered = maybeSavedPassword.isDefined();
+ return userRegistered && maybeSavedPassword.get().equals(providedPassword);
+ } catch (Exception e) {
+ LOG.warn(String.format("Could not check if user is authorized (header: '%s')), probably malformed header.",
+ authorizationHeader), e);
+ return false;
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/restapi/endpoints/Ui.java b/src/main/java/org/onap/dcae/restapi/ApiConfiguration.java
index ae593b44..85db81df 100644
--- a/src/main/java/org/onap/dcae/restapi/endpoints/Ui.java
+++ b/src/main/java/org/onap/dcae/restapi/ApiConfiguration.java
@@ -1,15 +1,16 @@
-/*-
+/*
* ============LICENSE_START=======================================================
* PROJECT
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.s
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,14 +19,28 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.restapi.endpoints;
+package org.onap.dcae.restapi;
-import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
-import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext;
+import org.onap.dcae.ApplicationSettings;
+import org.slf4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
-public class Ui extends NsaBaseEndpoint {
+@Configuration
+public class ApiConfiguration implements WebMvcConfigurer {
+ private final ApplicationSettings applicationSettings;
+ private Logger errorLogger;
- public static void hello(DrumlinRequestContext ctx) {
- ctx.renderer().renderTemplate("templates/hello.html");
+ @Autowired
+ ApiConfiguration(ApplicationSettings applicationSettings, Logger errorLogger) {
+ this.applicationSettings = applicationSettings;
+ this.errorLogger = errorLogger;
+ }
+
+ @Override
+ public void addInterceptors(InterceptorRegistry registry) {
+ registry.addInterceptor(new ApiAuthInterceptor(applicationSettings, errorLogger));
}
}
diff --git a/src/main/java/org/onap/dcae/restapi/ApiException.java b/src/main/java/org/onap/dcae/restapi/ApiException.java
index 0f922678..53895ffe 100644
--- a/src/main/java/org/onap/dcae/restapi/ApiException.java
+++ b/src/main/java/org/onap/dcae/restapi/ApiException.java
@@ -33,10 +33,10 @@ public enum ApiException {
UNAUTHORIZED_USER(ExceptionType.POLICY_EXCEPTION, "POL2000", "Unauthorized user", 401),
NO_SERVER_RESOURCES(ExceptionType.SERVICE_EXCEPTION, "SVC1000", "No server resources (internal processing queue full)", 503);
+ public final int httpStatusCode;
private final ExceptionType type;
private final String code;
private final String details;
- public final int httpStatusCode;
ApiException(ExceptionType type, String code, String details, int httpStatusCode) {
this.type = type;
@@ -45,18 +45,9 @@ public enum ApiException {
this.httpStatusCode = httpStatusCode;
}
- public enum ExceptionType {
- SERVICE_EXCEPTION, POLICY_EXCEPTION;
-
- @Override
- public String toString() {
- return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, this.name());
- }
- }
-
public JSONObject toJSON() {
JSONObject exceptionTypeNode = new JSONObject();
- exceptionTypeNode.put("messageId", code );
+ exceptionTypeNode.put("messageId", code);
exceptionTypeNode.put("text", details);
JSONObject requestErrorNode = new JSONObject();
@@ -67,4 +58,13 @@ public enum ApiException {
return rootNode;
}
+ public enum ExceptionType {
+ SERVICE_EXCEPTION, POLICY_EXCEPTION;
+
+ @Override
+ public String toString() {
+ return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, this.name());
+ }
+ }
+
}
diff --git a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java b/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java
deleted file mode 100644
index e5a29e9f..00000000
--- a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java
+++ /dev/null
@@ -1,127 +0,0 @@
-
-/*
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.restapi;
-
-import java.io.IOException;
-import java.net.URL;
-
-import javax.servlet.ServletException;
-
-import org.apache.tomcat.util.codec.binary.Base64;
-import org.onap.dcae.ApplicationSettings;
-import org.onap.dcae.commonFunction.CommonStartup;
-import org.onap.dcae.commonFunction.VESLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.apiServer.CommonServlet;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.drumlin.service.framework.routing.DrumlinRequestRouter;
-import com.att.nsa.drumlin.service.framework.routing.playish.DrumlinPlayishRoutingFileSource;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.nsa.security.NsaAuthenticator;
-
-import com.att.nsa.security.authenticators.SimpleAuthenticator;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-
-public class RestfulCollectorServlet extends CommonServlet
-{
-
- private static final long serialVersionUID = 1L;
- private static final Logger log = LoggerFactory.getLogger ( RestfulCollectorServlet.class );
-
- private static String authCredentialsList;
-
- public RestfulCollectorServlet ( ApplicationSettings settings ) throws loadException, missingReqdSetting
- {
- super ( settings.torrNvReadable(), "collector", false );
- authCredentialsList = settings.validAuthorizationCredentials();
- }
-
-
-
-
- /**
- * This is called once at server start. Use it to init any shared objects and setup the route mapping.
- */
- @Override
- protected void servletSetup () throws rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException
- {
- super.servletSetup ();
-
- try {
- // the base class provides a bunch of things like API authentication and ECOMP compliant
- // logging. The Restful Collector likely doesn't need API authentication, so for now,
- // we init the base class services with an in-memory (and empty!) config DB.
- commonServletSetup ( ConfigDbType.MEMORY );
-
- VESLogger.setUpEcompLogging();
-
- // setup the servlet routing and error handling
- final DrumlinRequestRouter drr = getRequestRouter ();
-
- // you can tell the request router what to do when a particular kind of exception is thrown.
- drr.setHandlerForException(IllegalArgumentException.class,
- (ctx, cause) -> sendJsonReply (ctx, HttpStatusCodes.k400_badRequest, cause.getMessage() ));
-
- // load the routes from the config file
- final URL routes = findStream ( "routes.conf" );
- if ( routes == null ) throw new rrNvReadable.missingReqdSetting ( "No routing configuration." );
- final DrumlinPlayishRoutingFileSource drs = new DrumlinPlayishRoutingFileSource ( routes );
- drr.addRouteSource ( drs );
-
- if (CommonStartup.authflag) {
- NsaAuthenticator<NsaSimpleApiKey> NsaAuth;
- NsaAuth = createAuthenticator(authCredentialsList);
-
- this.getSecurityManager().addAuthenticator(NsaAuth);
- }
-
- log.info ( "Restful Collector Servlet is up." );
- }
- catch ( SecurityException | IOException | ConfigDbException e ) {
- throw new ServletException ( e );
- }
- }
-
- public NsaAuthenticator<NsaSimpleApiKey> createAuthenticator(String authCredentials) {
- NsaAuthenticator<NsaSimpleApiKey> authenticator = new SimpleAuthenticator();
- if (authCredentials != null) {
- String authpair[] = authCredentials.split("\\|");
- for (String pair : authpair) {
- String lineid[] = pair.split(",");
- String listauthid = lineid[0];
- String listauthpwd = new String(Base64.decodeBase64(lineid[1]));
- ((SimpleAuthenticator) authenticator).add(listauthid, listauthpwd);
- }
-
- } else {
- ((SimpleAuthenticator) authenticator).add("admin", "collectorpasscode");
- }
- return authenticator;
- }
-
-}
-
diff --git a/src/main/java/org/onap/dcae/restapi/ServletConfig.java b/src/main/java/org/onap/dcae/restapi/ServletConfig.java
new file mode 100644
index 00000000..e8efa375
--- /dev/null
+++ b/src/main/java/org/onap/dcae/restapi/ServletConfig.java
@@ -0,0 +1,79 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.s
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.restapi;
+
+import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.SchemaValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.web.server.Ssl;
+import org.springframework.boot.web.server.WebServerFactoryCustomizer;
+import org.springframework.boot.web.servlet.server.ConfigurableServletWebServerFactory;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+
+import static java.nio.file.Files.readAllBytes;
+
+@Component
+public class ServletConfig implements WebServerFactoryCustomizer<ConfigurableServletWebServerFactory> {
+
+ private static final Logger log = LoggerFactory.getLogger(SchemaValidator.class);
+
+ @Autowired
+ private ApplicationSettings properties;
+
+ @Override
+ public void customize(ConfigurableServletWebServerFactory container) {
+ if (properties.authorizationEnabled()) {
+ container.setSsl(createSSL());
+ container.setPort(properties.httpsPort());
+ } else {
+ container.setPort(properties.httpPort());
+ }
+ }
+
+ private Ssl createSSL() {
+ log.info("Enabling SSL");
+ Ssl ssl = new Ssl();
+ ssl.setEnabled(true);
+ String keyStore = Paths.get(properties.keystoreFileLocation()).toAbsolutePath().toString();
+ log.info("Using keyStore path: " + keyStore);
+ ssl.setKeyStore(keyStore);
+ String keyPasswordFileLocation = Paths.get(properties.keystorePasswordFileLocation()).toAbsolutePath().toString();
+ log.info("Using keyStore password from: " + keyPasswordFileLocation);
+ ssl.setKeyPassword(getKeyStorePassword(keyPasswordFileLocation));
+ ssl.setKeyAlias(properties.keystoreAlias());
+ return ssl;
+ }
+
+ private String getKeyStorePassword(String location) {
+ try {
+ return new String(readAllBytes(Paths.get(location)));
+ } catch (IOException e) {
+ log.error("Could not read keystore password from: '" + location + "'.", e);
+ throw new RuntimeException(e);
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/restapi/VesRestController.java b/src/main/java/org/onap/dcae/restapi/VesRestController.java
new file mode 100644
index 00000000..b7fc5f3b
--- /dev/null
+++ b/src/main/java/org/onap/dcae/restapi/VesRestController.java
@@ -0,0 +1,214 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.s
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.restapi;
+
+import static java.util.Optional.ofNullable;
+import static java.util.stream.StreamSupport.stream;
+import static org.springframework.http.ResponseEntity.accepted;
+import static org.springframework.http.ResponseEntity.ok;
+
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
+import com.github.fge.jackson.JsonLoader;
+import com.github.fge.jsonschema.core.report.ProcessingReport;
+import com.github.fge.jsonschema.main.JsonSchema;
+
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import javax.servlet.http.HttpServletRequest;
+
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.CollectorSchemas;
+import org.onap.dcae.commonFunction.VESLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+public class VesRestController {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VesRestController.class);
+
+ private static final String FALLBACK_VES_VERSION = "v5";
+
+ @Autowired
+ private ApplicationSettings collectorProperties;
+
+ @Autowired
+ private CollectorSchemas schemas;
+
+ @Autowired
+ @Qualifier("metriclog")
+ private Logger metriclog;
+
+ @Autowired
+ @Qualifier("incomingRequestsLogger")
+ private Logger incomingRequestsLogger;
+
+ @Autowired
+ @Qualifier("errorLog")
+ private Logger errorLog;
+
+ private LinkedBlockingQueue<JSONObject> inputQueue;
+ private String version;
+
+ @Autowired
+ VesRestController(@Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger,
+ @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) {
+ this.incomingRequestsLogger = incomingRequestsLogger;
+ this.inputQueue = inputQueue;
+ }
+
+ @GetMapping("/")
+ String mainPage() {
+ return "Welcome to VESCollector";
+ }
+
+ //refactor in next iteration
+ @PostMapping(value = {"/eventListener/v1",
+ "/eventListener/v1/eventBatch",
+ "/eventListener/v2",
+ "/eventListener/v2/eventBatch",
+ "/eventListener/v3",
+ "/eventListener/v3/eventBatch",
+ "/eventListener/v4",
+ "/eventListener/v4/eventBatch",
+ "/eventListener/v5",
+ "/eventListener/v5/eventBatch"}, consumes = "application/json")
+ ResponseEntity<String> receiveEvent(@RequestBody String jsonPayload, HttpServletRequest httpServletRequest) {
+ String request = httpServletRequest.getRequestURI();
+ extractVersion(request);
+
+ JSONObject jsonObject;
+ try {
+ jsonObject = new JSONObject(jsonPayload);
+ } catch (Exception e) {
+ return ResponseEntity.badRequest().body(ApiException.INVALID_JSON_INPUT.toJSON().toString());
+ }
+
+ String uuid = setUpECOMPLoggingForRequest();
+ incomingRequestsLogger.info(String.format(
+ "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'",
+ jsonObject, uuid, version, httpServletRequest.getRemoteHost()));
+
+ if (collectorProperties.jsonSchemaValidationEnabled()) {
+ if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) {
+ if (!conformsToSchema(jsonObject, version)) {
+ return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
+ }
+ } else if (!isBatchRequest(request) && (!jsonObject.has("eventList") && (jsonObject.has("event")))) {
+ if (!conformsToSchema(jsonObject, version)) {
+ return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
+ }
+ } else {
+ return errorResponse(ApiException.INVALID_JSON_INPUT);
+ }
+ }
+
+ JSONArray commonlyFormatted = convertToJSONArrayCommonFormat(jsonObject, request, uuid, version);
+
+ if (!putEventsOnProcessingQueue(commonlyFormatted)) {
+ errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES);
+ return errorResponse(ApiException.NO_SERVER_RESOURCES);
+ }
+ return ok().contentType(MediaType.APPLICATION_JSON).body("Message Accepted");
+ }
+
+ private void extractVersion(String httpServletRequest) {
+ version = httpServletRequest.split("/")[2];
+ }
+
+ private ResponseEntity<String> errorResponse(ApiException noServerResources) {
+ return ResponseEntity.status(noServerResources.httpStatusCode)
+ .body(noServerResources.toJSON().toString());
+ }
+
+ private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) {
+ for (int i = 0; i < arrayOfEvents.length(); i++) {
+ metriclog.info("EVENT_PUBLISH_START");
+ if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) {
+ return false;
+ }
+ }
+ LOG.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
+ metriclog.info("EVENT_PUBLISH_END");
+ return true;
+ }
+
+ private boolean conformsToSchema(JSONObject payload, String version) {
+ try {
+ JsonSchema schema = ofNullable(schemas.getJSONSchemasMap(version).get(version))
+ .orElse(schemas.getJSONSchemasMap(version).get(FALLBACK_VES_VERSION));
+ ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString()));
+ if (!report.isSuccess()) {
+ LOG.warn("Schema validation failed for event: " + payload);
+ stream(report.spliterator(), false).forEach(e -> LOG.warn(e.getMessage()));
+ return false;
+ }
+ return report.isSuccess();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to validate against schema", e);
+ }
+ }
+
+ private static JSONArray convertToJSONArrayCommonFormat(JSONObject jsonObject, String request,
+ String uuid, String version) {
+ JSONArray asArrayEvents = new JSONArray();
+ String vesUniqueIdKey = "VESuniqueId";
+ String vesVersionKey = "VESversion";
+ if (isBatchRequest(request)) {
+ JSONArray events = jsonObject.getJSONArray("eventList");
+ for (int i = 0; i < events.length(); i++) {
+ JSONObject event = new JSONObject().put("event", events.getJSONObject(i));
+ event.put(vesUniqueIdKey, uuid + "-" + i);
+ event.put(vesVersionKey, version);
+ asArrayEvents.put(event);
+ }
+ } else {
+ jsonObject.put(vesUniqueIdKey, uuid);
+ jsonObject.put(vesVersionKey, version);
+ asArrayEvents = new JSONArray().put(jsonObject);
+ }
+ return asArrayEvents;
+ }
+
+ private static String setUpECOMPLoggingForRequest() {
+ final UUID uuid = UUID.randomUUID();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+ return uuid.toString();
+ }
+
+ private static boolean isBatchRequest(String request) {
+ return request.contains("eventBatch");
+ }
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java b/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java
deleted file mode 100644
index d60e2a11..00000000
--- a/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.restapi.endpoints;
-
-import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.drumlin.service.standards.MimeTypes;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-import com.google.gson.JsonParser;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.Base64;
-import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.onap.dcae.commonFunction.CommonStartup;
-import org.onap.dcae.commonFunction.CommonStartup.QueueFullException;
-import org.onap.dcae.commonFunction.VESLogger;
-import org.onap.dcae.restapi.ApiException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventReceipt extends NsaBaseEndpoint {
-
- private static final Logger log = LoggerFactory.getLogger(EventReceipt.class);
- private static final String MESSAGE = " Message:";
-
- public static void receiveVESEvent(DrumlinRequestContext ctx) {
- // the request body carries events. assume for now it's an array
- // of json objects that fits in memory. (See cambria's parsing for
- // handling large messages)
-
- NsaSimpleApiKey retkey = null;
-
-
- JSONObject jsonObject;
- InputStream istr = null;
- int arrayFlag = 0;
- String vesVersion = null;
- String userId=null;
-
- try {
-
-
- istr = ctx.request().getBodyStream();
- jsonObject = new JSONObject(new JSONTokener(istr));
-
- log.info("ctx getPathInContext: " + ctx.request().getPathInContext());
- Pattern p = Pattern.compile("(v\\d+)");
- Matcher m = p.matcher(ctx.request().getPathInContext());
-
- if (m.find()) {
- log.info("VES version:" + m.group());
- vesVersion = m.group();
- }
-
- final UUID uuid = UUID.randomUUID();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
-
- if (ctx.request().getPathInContext().contains("eventBatch")) {
- CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid
- + " VES Batch Input Messsage: " + jsonObject);
- log.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid + " VES Batch Input Messsage: "
- + jsonObject);
- arrayFlag = 1;
- } else {
- CommonStartup.inlog.info(
- ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);
- log.info(ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);
-
- }
-
- try {
- if (CommonStartup.authflag) {
- userId = getUser (ctx);
- retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx);
- }
- } catch (NullPointerException x) {
- //log.info("Invalid user request :" + userId + " FROM " + ctx.request().getRemoteAddress() + " " + ctx.request().getContentType() + MESSAGE + jsonObject);
- log.info(String.format("Unauthorized request %s FROM %s %s %s %s", getUser(ctx), ctx.request().getRemoteAddress(), ctx.request().getContentType(), MESSAGE, jsonObject));
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + userId + x);
- respondWithCustomMsginJson(ctx, ApiException.UNAUTHORIZED_USER);
- return;
- }
-
- if (schemaCheck(retkey, arrayFlag, jsonObject, vesVersion, ctx, uuid)) {
- return;
- }
-
- } catch (JSONException | NullPointerException | IOException x) {
- log.error(String.format("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest%d%s%s",
- HttpStatusCodes.k400_badRequest, MESSAGE, x.getMessage()));
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x);
- respondWithCustomMsginJson(ctx, ApiException.INVALID_JSON_INPUT);
- return;
- } catch (QueueFullException e) {
- log.error("Collector internal queue full :" + e.getMessage(), e);
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e);
- respondWithCustomMsginJson(ctx, ApiException.NO_SERVER_RESOURCES);
- return;
- } finally {
- if (istr != null) {
- safeClose(istr);
- }
- }
- log.info("MessageAccepted and k200_ok to be sent");
- ctx.response().sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson);
- }
-
-
- private static String getUser(DrumlinRequestContext ctx){
- String authorization = ctx.request().getFirstHeader("Authorization");
- if (authorization != null && authorization.startsWith("Basic")) {
- String base64Credentials = authorization.substring("Basic".length()).trim();
- String credentials = new String(Base64.getDecoder().decode(base64Credentials),
- Charset.forName("UTF-8"));
- final String[] values = credentials.split(":",2);
- log.debug("User:" + values[0] + " Pwd:" + values[1]);
- return values[0];
- }
- return null;
-
- }
-
- private static Boolean schemaCheck(NsaSimpleApiKey retkey, int arrayFlag,
- JSONObject jsonObject, String vesVersion,
- DrumlinRequestContext ctx, UUID uuid)
- throws JSONException, QueueFullException, IOException {
-
- JSONArray jsonArray;
- JSONArray jsonArrayMod = new JSONArray();
- JSONObject event;
- FileReader fr;
- if (retkey != null || !CommonStartup.authflag) {
- if (CommonStartup.schemaValidatorflag) {
- if ((arrayFlag == 1) && (jsonObject.has("eventList") && (!jsonObject.has("event")))
- || ((arrayFlag == 0) && (!jsonObject.has("eventList") && (jsonObject.has("event"))))) {
- fr = new FileReader(schemaFileVersion(vesVersion));
- String schema = new JsonParser().parse(fr).toString();
-
- String valresult = CommonStartup.validateAgainstSchema(jsonObject.toString(), schema);
- switch (valresult) {
- case "true":
- log.info("Validation successful");
- break;
- case "false":
- log.info("Validation failed");
- respondWithCustomMsginJson(ctx, ApiException.SCHEMA_VALIDATION_FAILED);
- return true;
- default:
- log.error("Validation errored" + valresult);
- respondWithCustomMsginJson(ctx, ApiException.INVALID_JSON_INPUT);
- return true;
- }
- } else {
- log.info("Validation failed");
- respondWithCustomMsginJson(ctx, ApiException.SCHEMA_VALIDATION_FAILED);
- return true;
- }
- if (arrayFlag == 1) {
- jsonArray = jsonObject.getJSONArray("eventList");
- log.info("Validation successful for all events in batch");
- for (int i = 0; i < jsonArray.length(); i++) {
- event = new JSONObject().put("event", jsonArray.getJSONObject(i));
- event.put("VESuniqueId", uuid + "-" + i);
- event.put("VESversion", vesVersion);
- jsonArrayMod.put(event);
- }
- log.info("Modified jsonarray:" + jsonArrayMod.toString());
- } else {
- jsonObject.put("VESuniqueId", uuid);
- jsonObject.put("VESversion", vesVersion);
- jsonArrayMod = new JSONArray().put(jsonObject);
- }
- }
-
- // reject anything that's not JSON
- if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) {
- log.info(String.format("Rejecting request with content type %s Message:%s",
- ctx.request().getContentType(), jsonObject));
- respondWithCustomMsginJson(ctx, ApiException.INVALID_CONTENT_TYPE);
- return true;
- }
-
- CommonStartup.handleEvents(jsonArrayMod);
- } else {
- log.info(String.format("Unauthorized request %s FROM %s %s %s %s", getUser(ctx), ctx.request().getRemoteAddress(), ctx.request().getContentType(), MESSAGE,
- jsonObject));
- respondWithCustomMsginJson(ctx, ApiException.UNAUTHORIZED_USER);
- return true;
- }
- return false;
- }
-
- private static void respondWithCustomMsginJson(DrumlinRequestContext ctx, ApiException apiException) {
- ctx.response()
- .sendErrorAndBody(apiException.httpStatusCode,
- apiException.toJSON().toString(), MimeTypes.kAppJson);
- }
-
- private static void safeClose(InputStream is) {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- log.error("Error closing Input stream : " + e);
- }
- }
-
- }
-
- public static String schemaFileVersion(String version) {
- return CommonStartup.schemaFileJson.has(version) ?
- CommonStartup.schemaFileJson.getString(version) : CommonStartup.schemaFileJson.getString("v5");
- }
-
-}
-
diff --git a/src/main/scripts/VESrestfulCollector.sh b/src/main/scripts/VESrestfulCollector.sh
index 8462f4e2..7f6d17cb 100644
--- a/src/main/scripts/VESrestfulCollector.sh
+++ b/src/main/scripts/VESrestfulCollector.sh
@@ -36,7 +36,7 @@ start() {
-XX:ErrorFile=logs/java_error%p.log \
-XX:+HeapDumpOnOutOfMemoryError \
-Dhttps.protocols=TLSv1.1,TLSv1.2 \
- org.onap.dcae.commonFunction.CommonStartup $* & &>> logs/collector.log
+ org.onap.dcae.VesApplication $* & &>> logs/collector.log
}
stop() {
diff --git a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
index b162cef2..b483bcb6 100644
--- a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
+++ b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Objects;
@@ -153,7 +154,7 @@ public class ApplicationSettingsTest {
String passwordFileLocation = fromTemporaryConfiguration("collector.keystore.passwordfile=/somewhere/password").keystorePasswordFileLocation();
// then
- assertEquals("/somewhere/password", passwordFileLocation);
+ assertEquals(sanitizePath("/somewhere/password"), passwordFileLocation);
}
@Test
@@ -162,7 +163,7 @@ public class ApplicationSettingsTest {
String passwordFileLocation = fromTemporaryConfiguration().keystorePasswordFileLocation();
// then
- assertEquals("./etc/passwordfile", passwordFileLocation);
+ assertEquals(sanitizePath("etc/passwordfile"), passwordFileLocation);
}
@Test
@@ -172,7 +173,7 @@ public class ApplicationSettingsTest {
.keystoreFileLocation();
// then
- assertEquals("/somewhere/keystore", keystoreFileLocation);
+ assertEquals(sanitizePath("/somewhere/keystore"), keystoreFileLocation);
}
@Test
@@ -181,7 +182,7 @@ public class ApplicationSettingsTest {
String keystoreFileLocation = fromTemporaryConfiguration().keystoreFileLocation();
// then
- assertEquals("../etc/keystore", keystoreFileLocation);
+ assertEquals(sanitizePath("etc/keystore"), keystoreFileLocation);
}
@@ -209,7 +210,7 @@ public class ApplicationSettingsTest {
String dmaapConfigFileLocation = fromTemporaryConfiguration("collector.dmaapfile=/somewhere/dmaapFile").cambriaConfigurationFileLocation();
// then
- assertEquals("/somewhere/dmaapFile", dmaapConfigFileLocation);
+ assertEquals(sanitizePath("/somewhere/dmaapFile"), dmaapConfigFileLocation);
}
@Test
@@ -218,7 +219,7 @@ public class ApplicationSettingsTest {
String dmaapConfigFileLocation = fromTemporaryConfiguration().cambriaConfigurationFileLocation();
// then
- assertEquals("./etc/DmaapConfig.json", dmaapConfigFileLocation);
+ assertEquals(sanitizePath("etc/DmaapConfig.json"), dmaapConfigFileLocation);
}
@Test
@@ -347,25 +348,25 @@ public class ApplicationSettingsTest {
@Test
public void shouldReturnValidCredentials() throws IOException {
// when
- String userToBase64PasswordDelimitedByCommaSeparatedByPipes = fromTemporaryConfiguration(
- "header.authlist=pasza,123jsad1|someoneelse,12asd31"
+ Map<String, String> allowedUsers = fromTemporaryConfiguration(
+ "header.authlist=pasza,c2ltcGxlcGFzc3dvcmQNCg==|someoneelse,c2ltcGxlcGFzc3dvcmQNCg=="
).validAuthorizationCredentials();
// then
- assertEquals("pasza,123jsad1|someoneelse,12asd31", userToBase64PasswordDelimitedByCommaSeparatedByPipes);
+ assertEquals(allowedUsers.get("pasza").get(), "simplepassword");
+ assertEquals(allowedUsers.get("someoneelse").get(), "simplepassword");
}
@Test
public void shouldbyDefaultThereShouldBeNoValidCredentials() throws IOException {
// when
- String userToBase64PasswordDelimitedByCommaSeparatedByPipes = fromTemporaryConfiguration().
+ Map<String, String> userToBase64PasswordDelimitedByCommaSeparatedByPipes = fromTemporaryConfiguration().
validAuthorizationCredentials();
// then
- assertNull(userToBase64PasswordDelimitedByCommaSeparatedByPipes);
+ assertTrue(userToBase64PasswordDelimitedByCommaSeparatedByPipes.isEmpty());
}
-
@Test
public void shouldReturnIfEventTransformingIsEnabled() throws IOException {
// when
@@ -392,7 +393,7 @@ public class ApplicationSettingsTest {
.cambriaConfigurationFileLocation();
// then
- assertEquals("/somewhere/dmaapConfig", cambriaConfigurationFileLocation);
+ assertEquals(sanitizePath("/somewhere/dmaapConfig"), cambriaConfigurationFileLocation);
}
@Test
@@ -402,7 +403,7 @@ public class ApplicationSettingsTest {
.cambriaConfigurationFileLocation();
// then
- assertEquals("./etc/DmaapConfig.json", cambriaConfigurationFileLocation);
+ assertEquals(sanitizePath("etc/DmaapConfig.json"), cambriaConfigurationFileLocation);
}
private static ApplicationSettings fromTemporaryConfiguration(String... fileLines)
@@ -410,8 +411,10 @@ public class ApplicationSettingsTest {
File tempConfFile = File.createTempFile("doesNotMatter", "doesNotMatter");
Files.write(tempConfFile.toPath(), Arrays.asList(fileLines));
tempConfFile.deleteOnExit();
- return new ApplicationSettings(new String[]{"-c", tempConfFile.toString()}, args -> processCmdLine(args));
+ return new ApplicationSettings(new String[]{"-c", tempConfFile.toString()}, args -> processCmdLine(args), "");
}
-
+ private String sanitizePath(String path) {
+ return Paths.get(path).toString();
+ }
} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/commonFunction/CommonStartupTest.java b/src/test/java/org/onap/dcae/commonFunction/CommonStartupTest.java
deleted file mode 100644
index 5a171484..00000000
--- a/src/test/java/org/onap/dcae/commonFunction/CommonStartupTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.commonFunction;
-
-import static java.util.Base64.getDecoder;
-import static java.util.Base64.getEncoder;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import com.att.nsa.cmdLine.NsaCommandLineUtil;
-import com.att.nsa.drumlin.service.framework.context.DrumlinRequest;
-import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
-import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.nsa.security.authenticators.SimpleAuthenticator;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.json.JSONArray;
-import org.json.JSONObject;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.onap.dcae.ApplicationSettings;
-import org.onap.dcae.CLIUtils;
-import org.onap.dcae.commonFunction.CommonStartup.QueueFullException;
-import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
-import org.onap.dcae.restapi.RestfulCollectorServlet;
-import org.onap.dcae.vestest.TestingUtilities;
-
-
-public class CommonStartupTest {
-
- @Test
- public void testParseCLIArguments() {
- // given
- String args[] = {"-a", "aa"};
- Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
- // when
- nvReadableStack settings = new nvReadableStack();
- settings.push(new nvReadableTable(argMap));
-
- // then
- assertEquals(settings.getString("a", "default"), "aa");
- }
-
- @Test
- public void shouldPutValidVESEventOnProcessingQueueWithoutExceptions() throws IOException, QueueFullException {
- // given
- CommonStartup.fProcessingInputQueue = new LinkedBlockingQueue<>(
- CommonStartup.maxQueueEvent);
- JsonElement vesEvent = new JsonParser().parse(new FileReader("src/test/resources/VES_valid.txt"));
- JSONObject validVESEvent = new JSONObject(vesEvent.toString());
- JSONArray jsonArrayMod = new JSONArray().put(validVESEvent);
-
- // then
- CommonStartup.handleEvents(jsonArrayMod);
- }
-
-
- @Test
- public void testParseStreamIdToStreamHashMapping() {
- // given
-
- CommonStartup.streamID = TestingUtilities.convertDMaaPStreamsPropertyToMap("fault=sec_fault|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert|voiceQuality=ves_voicequality|sipSignaling=ves_sipsignaling");
- EventProcessor eventProcessor = new EventProcessor(mock(EventPublisher.class));
- // when
- Map<String, String[]> streamHashMapping = EventProcessor.streamidHash;
-
- // then
- assertEquals(streamHashMapping.get("fault")[0], "sec_fault");
- assertEquals(streamHashMapping.get("measurementsForVfScaling")[0], "sec_measurement");
- }
-
- @Test
- public void testAuthListHandler() throws loadException, missingReqdSetting {
- // given
- ApplicationSettings settings = new ApplicationSettings(new String[]{}, CLIUtils::processCmdLine);
-
- String user1 = "secureid";
- String password1Hashed = "IWRjYWVSb2FkbTEyMyEt";
- String password1UnHashed = new String(getDecoder().decode("IWRjYWVSb2FkbTEyMyEt"));
- String user2 = "sample1";
- String password2Hashed = "c2FtcGxlMQ";
-
- String authlist = user1 + "," + password1Hashed + "|" + user2 + "," + password2Hashed;
-
- RestfulCollectorServlet rsv = new RestfulCollectorServlet(settings);
-
- DrumlinRequest drumlinRequestMock = Mockito.mock(DrumlinRequest.class);
-
- String basicHeaderForUser1 = "Basic " + getEncoder().encodeToString((user1 + ":" + password1UnHashed).getBytes());
- when(drumlinRequestMock.getFirstHeader("Authorization")).thenReturn(basicHeaderForUser1);
-
- // when
- SimpleAuthenticator simpleAuthenticator = (SimpleAuthenticator) rsv.createAuthenticator(authlist);
- NsaSimpleApiKey authentic = simpleAuthenticator.isAuthentic(drumlinRequestMock);
-
- // then
- assertEquals(authentic.getSecret(), password1UnHashed);
- }
-}
-
-
diff --git a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
index 77ef005f..f5c5d5f9 100644
--- a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
+++ b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
@@ -21,40 +21,42 @@
package org.onap.dcae.commonFunction;
import com.google.gson.Gson;
-import java.util.concurrent.atomic.AtomicReference;
-
-import io.vavr.collection.HashMap;
+import io.vavr.collection.Map;
import org.json.JSONObject;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-
-import java.util.List;
+import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.CLIUtils;
import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
import org.onap.dcae.vestest.TestingUtilities;
+import java.util.List;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import static org.onap.dcae.commonFunction.EventProcessor.EVENT_LIST_TYPE;
public class EventProcessorTest {
private final String ev = "{\"event\": {\"commonEventHeader\": { \"reportingEntityName\": \"VM name will be provided by ECOMP\", \"startEpochMicrosec\": 1477012779802988,\"lastEpochMicrosec\": 1477012789802988,\"eventId\": \"83\",\"sourceName\": \"Dummy VM name - No Metadata available\",\"sequence\": 83,\"priority\": \"Normal\",\"functionalRole\": \"vFirewall\",\"domain\": \"measurementsForVfScaling\",\"reportingEntityId\": \"VM UUID will be provided by ECOMP\",\"sourceId\": \"Dummy VM UUID - No Metadata available\",\"version\": 1.1},\"measurementsForVfScalingFields\": {\"measurementInterval\": 10,\"measurementsForVfScalingVersion\": 1.1,\"vNicUsageArray\": [{\"multicastPacketsIn\": 0,\"bytesIn\": 3896,\"unicastPacketsIn\": 0, \"multicastPacketsOut\": 0,\"broadcastPacketsOut\": 0, \"packetsOut\": 28,\"bytesOut\": 12178,\"broadcastPacketsIn\": 0,\"packetsIn\": 58,\"unicastPacketsOut\": 0,\"vNicIdentifier\": \"eth0\"}]}}}";
+ Map<String, String[]> streamID;
+ private ApplicationSettings properties;
+
@Before
public void setUp() {
- CommonStartup.streamID = TestingUtilities.convertDMaaPStreamsPropertyToMap("fault=sec_fault|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert|voiceQuality=ves_voicequality|sipSignaling=ves_sipsignaling");
+ streamID = TestingUtilities.convertDMaaPStreamsPropertyToMap("fault=sec_fault|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert|voiceQuality=ves_voicequality|sipSignaling=ves_sipsignaling");
+ properties = new ApplicationSettings(new String[]{}, CLIUtils::processCmdLine);
+ streamID = properties.dMaaPStreamsMapping();
}
@Test
public void testLoad() {
//given
- EventProcessor ec = new EventProcessor(mock(EventPublisher.class));
+ EventProcessor ec = new EventProcessor(mock(EventPublisher.class), properties);
ec.event = new org.json.JSONObject(ev);
//when
ec.overrideEvent();
@@ -67,7 +69,7 @@ public class EventProcessorTest {
@Test
public void shouldParseJsonEvents() throws ReflectiveOperationException {
//given
- EventProcessor eventProcessor = new EventProcessor(mock(EventPublisher.class));
+ EventProcessor eventProcessor = new EventProcessor(mock(EventPublisher.class), properties);
String event_json = "[{ \"filter\": {\"event.commonEventHeader.domain\":\"heartbeat\",\"VESversion\":\"v4\"},\"processors\":[" +
"{\"functionName\": \"concatenateValue\",\"args\":{\"field\":\"event.commonEventHeader.eventName\",\"concatenate\": [\"$event.commonEventHeader.domain\",\"$event.commonEventHeader.eventType\",\"$event.faultFields.alarmCondition\"], \"delimiter\":\"_\"}}" +
",{\"functionName\": \"addAttribute\",\"args\":{\"field\": \"event.heartbeatFields.heartbeatFieldsVersion\",\"value\": \"1.0\",\"fieldType\": \"number\"}}" +
@@ -86,5 +88,4 @@ public class EventProcessorTest {
assertThat(stringArgumentCaptor.getAllValues()).contains("concatenateValue", "addAttribute", "map");
}
-}
-
+} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/vestest/TestConfigProcessor.java b/src/test/java/org/onap/dcae/vestest/TestConfigProcessor.java
index 49b53d24..09517ae3 100644
--- a/src/test/java/org/onap/dcae/vestest/TestConfigProcessor.java
+++ b/src/test/java/org/onap/dcae/vestest/TestConfigProcessor.java
@@ -64,11 +64,11 @@ public class TestConfigProcessor {
final JSONObject jsonObject = getFileAsJsonObject();
final String functionRole = (jsonObject.getJSONObject("event")).getJSONObject("commonEventHeader")
- .get("functionalRole").toString();
+ .get("functionalRole").toString();
System.out.println("event==" + jsonObject.toString());
System.out.println("functionRole==" + functionRole);
final JSONObject jsonArgs = new JSONObject(
- "{\"field\": \"event.commonEventHeader.nfNamingCode\",\"oldField\": \"event.commonEventHeader.functionalRole\"}");
+ "{\"field\": \"event.commonEventHeader.nfNamingCode\",\"oldField\": \"event.commonEventHeader.functionalRole\"}");
ConfigProcessors cpEvent = new ConfigProcessors(jsonObject);
cpEvent.map(jsonArgs);
final String responseData = cpEvent.getEventObjectVal("event.commonEventHeader.nfNamingCode").toString();
@@ -82,15 +82,15 @@ public class TestConfigProcessor {
final JSONObject jsonObject = getFileAsJsonObject();
final String alarmAdditionalInformation = (jsonObject.getJSONObject("event")).getJSONObject("faultFields")
- .get("alarmAdditionalInformation").toString();
+ .get("alarmAdditionalInformation").toString();
System.out.println("event==" + jsonObject.toString());
System.out.println("alarmAdditionalInformation==" + alarmAdditionalInformation);
final JSONObject jsonArgs = new JSONObject(
- "{\"field\": \"event.faultFields.eventAdditionalInformation\",\"oldField\": \"event.faultFields.alarmAdditionalInformation\"}");
+ "{\"field\": \"event.faultFields.eventAdditionalInformation\",\"oldField\": \"event.faultFields.alarmAdditionalInformation\"}");
ConfigProcessors cpEvent = new ConfigProcessors(jsonObject);
cpEvent.map(jsonArgs);
final String responseData = cpEvent.getEventObjectVal("event.faultFields.eventAdditionalInformation")
- .toString();
+ .toString();
System.out.println("modified event==" + jsonObject.toString());
System.out.println("responseData==" + responseData);
assertEquals(alarmAdditionalInformation, responseData);
@@ -104,14 +104,14 @@ public class TestConfigProcessor {
System.out.println("event==" + jsonObject.toString());
//System.out.println("alarmAdditionalInformation==" + alarmAdditionalInformation);
final JSONObject jsonArgs = new JSONObject(
- "{\"field\": \"event.faultFields.vNicPerformanceArray[]\",\"oldField\": \"event.faultFields.errors\",\"attrMap\":{\"receiveDiscards\":\"receivedDiscardedPacketsAccumulated\"}}");
+ "{\"field\": \"event.faultFields.vNicPerformanceArray[]\",\"oldField\": \"event.faultFields.errors\",\"attrMap\":{\"receiveDiscards\":\"receivedDiscardedPacketsAccumulated\"}}");
ConfigProcessors cpEvent = new ConfigProcessors(jsonObject);
final String receiveDiscards = cpEvent.getEventObjectVal("event.faultFields.errors.receiveDiscards").toString();
System.out.println("receiveDiscards==" + receiveDiscards);
cpEvent.map(jsonArgs);
final String responseData = cpEvent
- .getEventObjectVal("event.faultFields.vNicPerformanceArray[0].receivedDiscardedPacketsAccumulated")
- .toString();
+ .getEventObjectVal("event.faultFields.vNicPerformanceArray[0].receivedDiscardedPacketsAccumulated")
+ .toString();
System.out.println("modified event==" + jsonObject.toString());
System.out.println("responseData==" + responseData);
assertEquals(receiveDiscards, responseData);
@@ -125,7 +125,7 @@ public class TestConfigProcessor {
System.out.println("event==" + jsonObject.toString());
//System.out.println("functionRole==" + functionRole);
final JSONObject jsonArgs = new JSONObject(
- "{\"field\": \"event.faultFields.version\",\"value\": \"2.0\",\"fieldType\": \"number\"}");
+ "{\"field\": \"event.faultFields.version\",\"value\": \"2.0\",\"fieldType\": \"number\"}");
ConfigProcessors cpEvent = new ConfigProcessors(jsonObject);
cpEvent.addAttribute(jsonArgs);
final String responseData = cpEvent.getEventObjectVal("event.faultFields.version").toString();
@@ -142,7 +142,7 @@ public class TestConfigProcessor {
System.out.println("event==" + jsonObject.toString());
//System.out.println("functionRole==" + functionRole);
final JSONObject jsonArgs = new JSONObject(
- "{\"field\": \"event.faultFields.version\",\"value\": \"2.0\",\"fieldType\": \"number\"}");
+ "{\"field\": \"event.faultFields.version\",\"value\": \"2.0\",\"fieldType\": \"number\"}");
ConfigProcessors cpEvent = new ConfigProcessors(jsonObject);
cpEvent.updateAttribute(jsonArgs);
final String responseData = cpEvent.getEventObjectVal("event.faultFields.version").toString();
@@ -156,16 +156,16 @@ public class TestConfigProcessor {
final JSONObject jsonObject = getFileAsJsonObject();
final String eventType = (jsonObject.getJSONObject("event")).getJSONObject("commonEventHeader").get("eventType")
- .toString();
+ .toString();
final String domain = (jsonObject.getJSONObject("event")).getJSONObject("commonEventHeader").get("domain")
- .toString();
+ .toString();
final String alarmCondition = (jsonObject.getJSONObject("event")).getJSONObject("faultFields")
- .get("alarmCondition").toString();
+ .get("alarmCondition").toString();
System.out.println("event==" + jsonObject.toString());
final String eventName = domain + "_" + eventType + "_" + alarmCondition;
System.out.println("eventName==" + eventName);
final JSONObject jsonArgs = new JSONObject(
- "{\"field\":\"event.commonEventHeader.eventName\",\"concatenate\": [\"$event.commonEventHeader.domain\",\"$event.commonEventHeader.eventType\",\"$event.faultFields.alarmCondition\"],\"delimiter\":\"_\"}");
+ "{\"field\":\"event.commonEventHeader.eventName\",\"concatenate\": [\"$event.commonEventHeader.domain\",\"$event.commonEventHeader.eventType\",\"$event.faultFields.alarmCondition\"],\"delimiter\":\"_\"}");
ConfigProcessors cpEvent = new ConfigProcessors(jsonObject);
cpEvent.concatenateValue(jsonArgs);
final String responseData = cpEvent.getEventObjectVal("event.commonEventHeader.eventName").toString();
@@ -179,14 +179,14 @@ public class TestConfigProcessor {
final JSONObject jsonObject = getFileAsJsonObject();
final String memoryConfigured = (jsonObject.getJSONObject("event")).getJSONObject("faultFields")
- .get("memoryConfigured").toString();
+ .get("memoryConfigured").toString();
final String memoryUsed = (jsonObject.getJSONObject("event")).getJSONObject("faultFields").get("memoryUsed")
- .toString();
+ .toString();
System.out.println("event==" + jsonObject.toString());
System.out.println("memoryConfigured==" + memoryConfigured);
System.out.println("memoryUsed==" + memoryUsed);
final JSONObject jsonArgs = new JSONObject(
- "{\"field\": \"event.faultFields.memoryFree\",\"subtract\": [\"$event.faultFields.memoryConfigured\",\"$event.faultFields.memoryUsed\"]}");
+ "{\"field\": \"event.faultFields.memoryFree\",\"subtract\": [\"$event.faultFields.memoryConfigured\",\"$event.faultFields.memoryUsed\"]}");
ConfigProcessors cpEvent = new ConfigProcessors(jsonObject);
cpEvent.subtractValue(jsonArgs);
final String responseData = cpEvent.getEventObjectVal("event.faultFields.memoryFree").toString();
@@ -202,7 +202,7 @@ public class TestConfigProcessor {
System.out.println("event==" + jsonObject.toString());
System.out.println("Testing SetValue");
final JSONObject jsonArgs = new JSONObject(
- "{\"field\": \"event.faultFields.version\",\"value\": \"2.0\",\"fieldType\": \"number\"}");
+ "{\"field\": \"event.faultFields.version\",\"value\": \"2.0\",\"fieldType\": \"number\"}");
ConfigProcessors cpEvent = new ConfigProcessors(jsonObject);
cpEvent.setValue(jsonArgs);
final String responseData = cpEvent.getEventObjectVal("event.faultFields.version").toString();
diff --git a/src/test/java/org/onap/dcae/vestest/TestEventReceipt.java b/src/test/java/org/onap/dcae/vestest/TestEventReceipt.java
deleted file mode 100644
index a3893eaa..00000000
--- a/src/test/java/org/onap/dcae/vestest/TestEventReceipt.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.vestest;
-
-import static org.junit.Assert.assertEquals;
-
-import org.json.JSONObject;
-import org.junit.Test;
-import org.onap.dcae.commonFunction.CommonStartup;
-import org.onap.dcae.restapi.endpoints.EventReceipt;
-
-public class TestEventReceipt {
-
-
- @Test
- public void shouldGetSchemaFileLocationBasedOnVersion() {
- CommonStartup.schemaFileJson = new JSONObject("{\"v1\":\"filePath1\", \"v5\":\"filePath2\"}");
- String schemaFilePath = EventReceipt.schemaFileVersion("v5");
- assertEquals(schemaFilePath, "filePath2");
- }
-
- @Test
- public void shouldByDefaultReturnV5SchemaFileLocation() {
- CommonStartup.schemaFileJson = new JSONObject("{\"v1\":\"filePath1\", \"v5\":\"filePath2\"}");
- String schemaFilePath = EventReceipt.schemaFileVersion("v2");
- assertEquals(schemaFilePath, "filePath2");
- }
-
-}
diff --git a/src/test/java/org/onap/dcae/vestest/TestJsonSchemaValidation.java b/src/test/java/org/onap/dcae/vestest/TestJsonSchemaValidation.java
index 0489811d..c39fb013 100644
--- a/src/test/java/org/onap/dcae/vestest/TestJsonSchemaValidation.java
+++ b/src/test/java/org/onap/dcae/vestest/TestJsonSchemaValidation.java
@@ -28,24 +28,24 @@ import com.google.gson.JsonParser;
import java.io.IOException;
import java.nio.file.Paths;
import org.junit.Test;
-import org.onap.dcae.commonFunction.CommonStartup;
+import org.onap.dcae.SchemaValidator;
public class TestJsonSchemaValidation {
@Test
public void shouldValidEventPassSchema_27_2() throws IOException {
- String result = CommonStartup.validateAgainstSchema(
- readJSONFromFile("src/test/resources/VES_valid.txt").toString(),
- readJSONFromFile("etc/CommonEventFormat_27.2.json").toString());
+ String result = SchemaValidator.validateAgainstSchema(
+ readJSONFromFile("src/test/resources/VES_valid.txt").toString(),
+ readJSONFromFile("etc/CommonEventFormat_27.2.json").toString());
assertEquals(result, "true");
}
@Test
public void shouldInvalidEventDoesNotPassSchema_27_2() throws IOException {
- String result = CommonStartup.validateAgainstSchema(
- readJSONFromFile("src/test/resources/VES_invalid.txt").toString(),
- readJSONFromFile("etc/CommonEventFormat_27.2.json").toString());
+ String result = SchemaValidator.validateAgainstSchema(
+ readJSONFromFile("src/test/resources/VES_invalid.txt").toString(),
+ readJSONFromFile("etc/CommonEventFormat_27.2.json").toString());
assertEquals(result, "false");
}