summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/dcae/ApplicationSettings.java176
-rw-r--r--src/main/java/org/onap/dcae/CLIUtils.java60
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/AnyNode.java113
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/CommonStartup.java514
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java1068
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java128
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java213
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/Event.java34
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/EventProcessor.java365
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/EventPublisher.java180
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/Processor.java33
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/VESLogger.java242
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java107
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java99
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java62
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java124
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java38
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java98
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java51
-rw-r--r--src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java308
-rw-r--r--src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java178
-rw-r--r--src/main/java/org/onap/dcae/restapi/ApiException.java70
-rw-r--r--src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java288
-rw-r--r--src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java525
-rw-r--r--src/main/resources/seclogger.yaml19
-rw-r--r--src/main/scripts/VESConfigPoller.sh125
-rw-r--r--src/main/scripts/VESrestfulCollector.sh275
-rw-r--r--src/main/scripts/VESrestfulCollector_Status.sh41
-rw-r--r--src/main/scripts/docker-entry.sh107
-rw-r--r--src/main/scripts/logger.sh58
-rw-r--r--src/main/scripts/reconfigure.sh18
-rw-r--r--src/main/scripts/run-dcae-controller-ves-collector-daemon.sh39
-rw-r--r--src/main/scripts/run-dcae-controller-ves-collector-interactive.sh39
33 files changed, 3048 insertions, 2747 deletions
diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java
new file mode 100644
index 00000000..0ebd1e90
--- /dev/null
+++ b/src/main/java/org/onap/dcae/ApplicationSettings.java
@@ -0,0 +1,176 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.s
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae;
+
+import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
+import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.google.common.annotations.VisibleForTesting;
+import io.vavr.Function1;
+import io.vavr.collection.HashMap;
+import io.vavr.collection.Map;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Paths;
+
+/**
+ * Abstraction over application configuration.
+ * Its job is to provide easily discoverable (by method names lookup) and type safe access to configuration properties.
+ */
+public class ApplicationSettings {
+
+ private static final Logger inlog = LoggerFactory.getLogger(ApplicationSettings.class);
+ private static final String COLLECTOR_PROPERTIES = "etc/collector.properties";
+ private final PropertiesConfiguration properties = new PropertiesConfiguration();
+
+ public ApplicationSettings(String[] args, Function1<String[], Map<String, String>> argsParser) {
+ properties.setDelimiterParsingDisabled(true);
+ Map<String, String> parsedArgs = argsParser.apply(args);
+ loadProperties(Paths.get(new File(COLLECTOR_PROPERTIES).getAbsolutePath()).toString());
+ loadCommandLineProperties(parsedArgs);
+ parsedArgs.filterKeys(k -> !k.equals("c")).forEach(this::updateProperty);
+ }
+
+ private void loadCommandLineProperties(Map<String, String> parsedArgs) {
+ parsedArgs.get("c").forEach(e -> {
+ properties.clear();
+ loadProperties(e);
+ });
+ }
+
+ private void loadProperties(String property){
+ try {
+ properties.load(property);
+ } catch (ConfigurationException ex) {
+ inlog.error("Cannot load properties cause:", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public String validAuthorizationCredentials() {
+ return properties.getString("header.authlist", null);
+ }
+
+ public int maximumAllowedQueuedEvents() {
+ return properties.getInt("collector.inputQueue.maxPending", 1024 * 4);
+ }
+
+ public boolean jsonSchemaValidationEnabled() {
+ return properties.getInt("collector.schema.checkflag", -1) > 0;
+ }
+
+ public boolean authorizationEnabled() {
+ return properties.getInt("header.authflag", 0) > 0;
+ }
+
+ public JSONObject jsonSchema() {
+ return new JSONObject(
+ properties.getString("collector.schema.file", "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}"));
+ }
+
+ public int httpPort() {
+ return properties.getInt("collector.service.port", 8080);
+ }
+
+ public int httpsPort() {
+ return properties.getInt("collector.service.secure.port", 8443);
+ }
+
+ public boolean httpsEnabled() {
+ return httpsPort() > 0;
+ }
+
+ public boolean eventTransformingEnabled() {
+ return properties.getInt("event.transform.flag", 1) > 0;
+ }
+
+ public String keystorePasswordFileLocation() {
+ return properties.getString("collector.keystore.passwordfile", "./etc/passwordfile");
+ }
+
+ public String keystoreFileLocation() {
+ return properties.getString("collector.keystore.file.location", "../etc/keystore");
+ }
+
+ public String keystoreAlias() {
+ return properties.getString("collector.keystore.alias", "tomcat");
+ }
+
+ public String exceptionConfigFileLocation() {
+ return properties.getString("exceptionConfig", null);
+ }
+
+ public String cambriaConfigurationFileLocation() {
+ return properties.getString("collector.dmaapfile", "./etc/DmaapConfig.json");
+ }
+
+ public Map<String, String[]> dMaaPStreamsMapping() {
+ String streamIdsProperty = properties.getString("collector.dmaap.streamid", null);
+ if (streamIdsProperty == null) {
+ return HashMap.empty();
+ } else {
+ return convertDMaaPStreamsPropertyToMap(streamIdsProperty);
+ }
+ }
+
+ /*
+ * Kept back here for backward compatibility.
+ * RestfulCollectorServlet upon its initialization requires options to be represented
+ * as object represented by rrNvReadable interface, so we define a a handy transformation function here.
+ */
+ public rrNvReadable torrNvReadable() {
+ final nvReadableStack settings = new nvReadableStack();
+ settings.push(new nvReadableTable(ConfigurationConverter.getProperties(properties)));
+ return settings;
+ }
+
+ private Map<String, String[]> convertDMaaPStreamsPropertyToMap(String streamIdsProperty) {
+ java.util.HashMap<String, String[]> domainToStreamIdsMapping = new java.util.HashMap<>();
+ String[] topics = streamIdsProperty.split("\\|");
+ for (String t : topics) {
+ String domain = t.split("=")[0];
+ String[] streamIds = t.split("=")[1].split(",");
+ domainToStreamIdsMapping.put(domain, streamIds);
+ }
+ return HashMap.ofAll(domainToStreamIdsMapping);
+ }
+
+ private void updateProperty(String key, String value) {
+ if (properties.containsKey(key)) {
+ properties.setProperty(key, value);
+ } else {
+ properties.addProperty(key, value);
+ }
+ }
+
+ @VisibleForTesting
+ String getStringDirectly(String key) {
+ return properties.getString(key);
+ }
+}
+
diff --git a/src/main/java/org/onap/dcae/CLIUtils.java b/src/main/java/org/onap/dcae/CLIUtils.java
new file mode 100644
index 00000000..6450d2e5
--- /dev/null
+++ b/src/main/java/org/onap/dcae/CLIUtils.java
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae;
+
+import java.util.HashMap;
+
+/**
+ * CLIUtils extracted from nsaServerLibrary this implementation will be removed once we switch to different API library
+ */
+public class CLIUtils {
+
+ public static io.vavr.collection.HashMap<String, String> processCmdLine (String[] args) {
+ final HashMap<String,String> map = new HashMap<String,String> ();
+
+ String lastKey = null;
+ for ( String arg : args )
+ {
+ if ( arg.startsWith ( "-" ) )
+ {
+ if ( lastKey != null )
+ {
+ map.put ( lastKey.substring(1), "" );
+ }
+ lastKey = arg;
+ }
+ else
+ {
+ if ( lastKey != null )
+ {
+ map.put ( lastKey.substring(1), arg );
+ }
+ lastKey = null;
+ }
+ }
+ if ( lastKey != null )
+ {
+ map.put ( lastKey.substring(1), "" );
+ }
+ return io.vavr.collection.HashMap.ofAll(map);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java
new file mode 100644
index 00000000..97d73ddd
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java
@@ -0,0 +1,113 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2018 Nokia Networks Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction;
+
+import static io.vavr.API.Set;
+
+import io.vavr.collection.List;
+import io.vavr.collection.Set;
+import io.vavr.control.Option;
+import java.util.stream.StreamSupport;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * This class is a wrapper for 2 most used entities of org.json lib: JSONArray and JSONObject and comprises utility
+ * methods for fast access of json structures without need to explicitly coerce between them. While using this, bear in
+ * mind it does not contain exception handling - it is assumed that when using, the parsed json structure is known.
+ *
+ * @author koblosz
+ */
+public class AnyNode {
+
+ private Object obj;
+
+ private AnyNode(Object object) {
+ this.obj = object;
+ }
+
+ public static AnyNode fromString(String content) {
+ return new AnyNode(new JSONObject(content));
+ }
+
+ /**
+ * Returns key set of underlying object. It is assumed that underlying object is of type org.json.JSONObject.
+ */
+ public Set<String> keys() {
+ return Set(asJsonObject().keySet().toArray(new String[]{}));
+ }
+
+ /**
+ * Returns value associated with specified key wrapped with AnyValue object. It is assumed that this is of type
+ * org.json.JSONObject.
+ */
+ public AnyNode get(String key) {
+ return new AnyNode(asJsonObject().get(key));
+ }
+
+ /**
+ * Returns string representation of this. If it happens to have null, the value is treated as
+ * org.json.JSONObject.NULL and "null" string is returned then.
+ */
+ public String toString() {
+ return this.obj.toString();
+ }
+
+ /**
+ * Returns optional of object under specified key, wrapped with AnyNode object.
+ * If underlying object is not of type org.json.JSONObject
+ * or underlying object has no given key
+ * or given key is null
+ * then Optional.empty will be returned.
+ */
+ public Option<AnyNode> getAsOption(String key) {
+ try {
+ AnyNode value = get(key);
+ if (value.toString().equals("null")) {
+ return Option.none();
+ }
+ return Option.some(value);
+ } catch (JSONException ex) {
+ return Option.none();
+ }
+ }
+
+ /**
+ * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that
+ * underlying object is of type org.json.JSONObject.
+ */
+ public List<AnyNode> toList() {
+ return List.ofAll(StreamSupport.stream(((JSONArray) this.obj).spliterator(), false).map(AnyNode::new));
+ }
+
+ /**
+ * Checks if specified key is present in this. It is assumed that this is of type JSONObject.
+ */
+ public boolean has(String key) {
+ return !getAsOption(key).isEmpty();
+ }
+
+ private JSONObject asJsonObject() {
+ return (JSONObject) this.obj;
+ }
+
+
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
index b974ed53..36713aa4 100644
--- a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
+++ b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
@@ -1,310 +1,204 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.commonFunction;
-
-import com.att.nsa.apiServer.ApiServer;
-import com.att.nsa.apiServer.ApiServerConnector;
-import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
-import com.att.nsa.cmdLine.NsaCommandLineUtil;
-import com.att.nsa.drumlin.service.framework.DrumlinServlet;
-import com.att.nsa.drumlin.till.nv.impl.nvPropertiesFile;
-import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
-import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.github.fge.jsonschema.exceptions.ProcessingException;
-import com.github.fge.jsonschema.main.JsonSchema;
-import com.github.fge.jsonschema.main.JsonSchemaFactory;
-import com.github.fge.jsonschema.report.ProcessingMessage;
-import com.github.fge.jsonschema.report.ProcessingReport;
-import com.github.fge.jsonschema.util.JsonLoader;
-import org.apache.catalina.LifecycleException;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.onap.dcae.restapi.RestfulCollectorServlet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URL;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import javax.servlet.ServletException;
-
-
-public class CommonStartup extends NsaBaseEndpoint implements Runnable {
-
- public static final String KCONFIG = "c";
-
- public static final String KSETTING_PORT = "collector.service.port";
- public static final int KDEFAULT_PORT = 8080;
-
- public static final String KSETTING_SECUREPORT = "collector.service.secure.port";
- public static final int KDEFAULT_SECUREPORT = -1;
-
- public static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile";
- public static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile";
- public static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location";
- public static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore";
- public static final String KSETTING_KEYALIAS = "collector.keystore.alias";
- public static final String KDEFAULT_KEYALIAS = "tomcat";
-
- public static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile";
- protected static final String[] KDEFAULT_DMAAPCONFIGS = new String[] { "/etc/DmaapConfig.json" };
-
- public static final String KSETTING_MAXQUEUEDEVENTS = "collector.inputQueue.maxPending";
- public static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4;
-
- public static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag";
- public static final int KDEFAULT_SCHEMAVALIDATOR = -1;
-
- public static final String KSETTING_SCHEMAFILE = "collector.schema.file";
- public static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}";
- public static final String KSETTING_EXCEPTIONCONFIG = "exceptionConfig";
-
- public static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid";
-
- public static final String KSETTING_AUTHFLAG = "header.authflag";
- public static final int KDEFAULT_AUTHFLAG = 0;
-
- public static final String kSetting_authlist = "header.authlist";
-
- public static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag";
- public static final int KDEFAULT_EVENTTRANSFORMFLAG = 1;
-
- public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");
- public static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
- public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");
- public static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
-
- public static int schema_Validatorflag = -1;
- public static int authflag = 1;
- public static int eventTransformFlag = 1;
- public static String schemaFile;
- public static JSONObject schemaFileJson;
- public static String exceptionConfig;
- public static String cambriaConfigFile;
- private boolean listnerstatus;
- public static String streamid;
-
- private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting,
- rrNvReadable.invalidSettingValue, ServletException, InterruptedException {
- final List<ApiServerConnector> connectors = new LinkedList<ApiServerConnector>();
-
- if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) {
- // http service
- connectors.add(new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)).secure(false)
- .build());
- }
-
- // optional https service
- final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT);
- final String keystoreFile = settings.getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE);
- final String keystorePasswordFile = settings.getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE);
- final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS);
-
- if (securePort > 0) {
- final String KSETTING_KEYSTOREPASS = readFile(keystorePasswordFile, Charset.defaultCharset());
- connectors.add(new ApiServerConnector.Builder(securePort).secure(true)
- .keystorePassword(KSETTING_KEYSTOREPASS).keystoreFile(keystoreFile).keyAlias(keyAlias).build());
-
- }
-
- // Reading other config properties
-
- schema_Validatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR);
- if (schema_Validatorflag > 0) {
- schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE);
- // System.out.println("SchemaFile:" + schemaFile);
- schemaFileJson = new JSONObject(schemaFile);
-
- }
- exceptionConfig = settings.getString(KSETTING_EXCEPTIONCONFIG, null);
- authflag = settings.getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG);
- String[] currentconffile = settings.getStrings(CommonStartup.KSETTING_DMAAPCONFIGS,
- CommonStartup.KDEFAULT_DMAAPCONFIGS);
- cambriaConfigFile = currentconffile[0];
- streamid = settings.getString(KSETTING_DMAAPSTREAMID, null);
- eventTransformFlag = settings.getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG);
-
- fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true)
- .name("collector").build();
-
- // Load override exception map
- CustomExceptionLoader.LoadMap();
- setListnerstatus(true);
- }
-
- public static void main(String[] args) {
- ExecutorService executor = null;
- try {
- // process command line arguments
- final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
- final String config = NsaCommandLineUtil.getSetting(argMap, KCONFIG, "collector.properties");
- final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class);
-
- final nvReadableStack settings = new nvReadableStack();
- settings.push(new nvPropertiesFile(settingStream));
- settings.push(new nvReadableTable(argMap));
-
- fProcessingInputQueue = new LinkedBlockingQueue<JSONObject>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS);
-
- VESLogger.setUpEcompLogging();
-
- CommonStartup cs = new CommonStartup(settings);
-
- Thread csmain = new Thread(cs);
- csmain.start();
-
- EventProcessor ep = new EventProcessor();
- // Thread epThread=new Thread(ep);
- // epThread.start();
- executor = Executors.newFixedThreadPool(20);
- executor.execute(ep);
-
- } catch (loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException
- | InterruptedException e) {
- CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage());
- throw new RuntimeException(e);
- } finally {
- // This will make the executor accept no new threads
- // and finish all existing threads in the queue
- if (executor != null) {
- executor.shutdown();
- }
-
- }
- }
-
- public void run() {
- try {
- fTomcatServer.start();
- } catch (LifecycleException | IOException e) {
-
- log.error("lifecycle or IO: ", e);
- }
- fTomcatServer.await();
- }
-
- public boolean isListnerstatus() {
- return listnerstatus;
- }
-
- public void setListnerstatus(boolean listnerstatus) {
- this.listnerstatus = listnerstatus;
- }
-
- public static Queue<JSONObject> getProcessingInputQueue() {
- return fProcessingInputQueue;
- }
-
- public static class QueueFullException extends Exception {
-
- private static final long serialVersionUID = 1L;
- }
-
- public static void handleEvents(JSONArray a) throws QueueFullException, JSONException, IOException {
- final Queue<JSONObject> queue = getProcessingInputQueue();
- try {
-
- CommonStartup.metriclog.info("EVENT_PUBLISH_START");
- for (int i = 0; i < a.length(); i++) {
- if (!queue.offer(a.getJSONObject(i))) {
- throw new QueueFullException();
- }
-
- }
- log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
- CommonStartup.metriclog.info("EVENT_PUBLISH_END");
- // ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS);
-
- } catch (JSONException e) {
- throw e;
-
- }
- }
-
- static String readFile(String path, Charset encoding) throws IOException {
- byte[] encoded = Files.readAllBytes(Paths.get(path));
- String pwd = new String(encoded);
- return pwd.substring(0, pwd.length() - 1);
- }
-
- public static String schemavalidate(String jsonData, String jsonSchema) {
- ProcessingReport report;
- String result = "false";
-
- try {
- // System.out.println("Applying schema: @<@<"+jsonSchema+">@>@ to
- // data: #<#<"+jsonData+">#>#");
- log.trace("Schema validation for event:" + jsonData);
- JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
- JsonNode data = JsonLoader.fromString(jsonData);
- JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
- JsonSchema schema = factory.getJsonSchema(schemaNode);
- report = schema.validate(data);
- } catch (JsonParseException e) {
- log.error("schemavalidate:JsonParseException for event:" + jsonData);
- return e.getMessage().toString();
- } catch (ProcessingException e) {
- log.error("schemavalidate:Processing exception for event:" + jsonData);
- return e.getMessage().toString();
- } catch (IOException e) {
- log.error(
- "schemavalidate:IO exception; something went wrong trying to read json data for event:" + jsonData);
- return e.getMessage().toString();
- }
- if (report != null) {
- Iterator<ProcessingMessage> iter = report.iterator();
- while (iter.hasNext()) {
- ProcessingMessage pm = iter.next();
- log.trace("Processing Message: " + pm.getMessage());
- }
- result = String.valueOf(report.isSuccess());
- }
- try {
- log.debug("Validation Result:" + result + " Validation report:" + report);
- } catch (NullPointerException e) {
- log.error("schemavalidate:NullpointerException on report");
- }
- return result;
- }
-
- public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
- private static ApiServer fTomcatServer = null;
- private static final Logger log = LoggerFactory.getLogger(CommonStartup.class);
-
-}
-
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction;
+
+import com.att.nsa.apiServer.ApiServer;
+import com.att.nsa.apiServer.ApiServerConnector;
+import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.github.fge.jackson.JsonLoader;
+import com.github.fge.jsonschema.core.exceptions.ProcessingException;
+import com.github.fge.jsonschema.core.report.ProcessingMessage;
+import com.github.fge.jsonschema.core.report.ProcessingReport;
+import com.github.fge.jsonschema.main.JsonSchema;
+import com.github.fge.jsonschema.main.JsonSchemaFactory;
+import org.apache.catalina.LifecycleException;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.CLIUtils;
+import org.onap.dcae.commonFunction.event.publishing.DMaaPConfigurationParser;
+import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
+import org.onap.dcae.restapi.RestfulCollectorServlet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class CommonStartup extends NsaBaseEndpoint implements Runnable {
+
+ private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
+ public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");
+ static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
+ public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");
+
+ static int maxQueueEvent = 1024 * 4;
+ public static boolean schemaValidatorflag = false;
+ public static boolean authflag = false;
+ static boolean eventTransformFlag = true;
+ public static JSONObject schemaFileJson;
+ static String cambriaConfigFile;
+ public static io.vavr.collection.Map<String , String [] > streamID;
+
+ static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
+ private static ApiServer fTomcatServer = null;
+ private static final Logger log = LoggerFactory.getLogger(CommonStartup.class);
+
+ private CommonStartup(ApplicationSettings settings) throws loadException, IOException, rrNvReadable.missingReqdSetting {
+ final List<ApiServerConnector> connectors = new LinkedList<>();
+
+ if (!settings.authorizationEnabled()) {
+ connectors.add(new ApiServerConnector.Builder(settings.httpPort()).secure(false).build());
+ }
+
+ final int securePort = settings.httpsPort();
+ final String keystoreFile = settings.keystoreFileLocation();
+ final String keystorePasswordFile = settings.keystorePasswordFileLocation();
+ final String keyAlias = settings.keystoreAlias();
+
+ if (settings.authorizationEnabled()) {
+ String keystorePassword = readFile(keystorePasswordFile);
+ connectors.add(new ApiServerConnector.Builder(securePort).secure(true)
+ .keystorePassword(keystorePassword).keystoreFile(keystoreFile).keyAlias(keyAlias).build());
+
+ }
+
+ schemaValidatorflag = settings.jsonSchemaValidationEnabled();
+ maxQueueEvent = settings.maximumAllowedQueuedEvents();
+ if (schemaValidatorflag) {
+ schemaFileJson = settings.jsonSchema();
+
+ }
+ authflag = settings.authorizationEnabled();
+ cambriaConfigFile = settings.cambriaConfigurationFileLocation();
+ streamID = settings.dMaaPStreamsMapping();
+ eventTransformFlag = settings.eventTransformingEnabled();
+
+ fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true)
+ .name("collector").build();
+ }
+
+ public static void main(String[] args) {
+ try {
+
+ fProcessingInputQueue = new LinkedBlockingQueue<>(CommonStartup.maxQueueEvent);
+
+ VESLogger.setUpEcompLogging();
+
+ CommonStartup cs = new CommonStartup(new ApplicationSettings(args, CLIUtils::processCmdLine));
+
+ Thread commonStartupThread = new Thread(cs);
+ commonStartupThread.start();
+
+ EventProcessor ep = new EventProcessor(EventPublisher.createPublisher(oplog,
+ DMaaPConfigurationParser
+ .parseToDomainMapping(Paths.get(cambriaConfigFile))
+ .get()));
+ ExecutorService executor = Executors.newFixedThreadPool(20);
+ for (int i = 0; i < 20; ++i) {
+ executor.execute(ep);
+ }
+ } catch (Exception e) {
+ CommonStartup.eplog.error("Fatal error during application startup", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void run() {
+ try {
+ fTomcatServer.start();
+ fTomcatServer.await();
+ } catch (LifecycleException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static class QueueFullException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static void handleEvents(JSONArray a) throws QueueFullException, JSONException {
+ CommonStartup.metriclog.info("EVENT_PUBLISH_START");
+ for (int i = 0; i < a.length(); i++) {
+ if (!fProcessingInputQueue.offer(a.getJSONObject(i))) {
+ throw new QueueFullException();
+ }
+ }
+ log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
+ CommonStartup.metriclog.info("EVENT_PUBLISH_END");
+ }
+
+ private static String readFile(String path) throws IOException {
+ byte[] encoded = Files.readAllBytes(Paths.get(path));
+ String pwd = new String(encoded);
+ return pwd.substring(0, pwd.length() - 1);
+ }
+
+ public static String validateAgainstSchema(String jsonData, String jsonSchema) {
+ ProcessingReport report;
+ String result = "false";
+
+ try {
+ log.trace("Schema validation for event:" + jsonData);
+ JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
+ JsonNode data = JsonLoader.fromString(jsonData);
+ JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
+ JsonSchema schema = factory.getJsonSchema(schemaNode);
+ report = schema.validate(data);
+ } catch (JsonParseException e) {
+ log.error("validateAgainstSchema:JsonParseException for event:" + jsonData);
+ return e.getMessage();
+ } catch (ProcessingException e) {
+ log.error("validateAgainstSchema:Processing exception for event:" + jsonData);
+ return e.getMessage();
+ } catch (IOException e) {
+ log.error(
+ "validateAgainstSchema:IO exception; something went wrong trying to read json data for event:" + jsonData);
+ return e.getMessage();
+ }
+ if (report != null) {
+ for (ProcessingMessage pm : report) {
+ log.trace("Processing Message: " + pm.getMessage());
+ }
+ result = String.valueOf(report.isSuccess());
+ }
+ try {
+ log.debug("Validation Result:" + result + " Validation report:" + report);
+ } catch (NullPointerException e) {
+ log.error("validateAgainstSchema:NullpointerException on report");
+ }
+ return result;
+ }
+
+
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java
index 51158aa7..a6de0fc8 100644
--- a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java
+++ b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java
@@ -21,7 +21,6 @@
package org.onap.dcae.commonFunction;
-
import java.text.DecimalFormat;
import org.json.JSONArray;
import org.json.JSONObject;
@@ -30,640 +29,475 @@ import org.slf4j.LoggerFactory;
public class ConfigProcessors {
- private static Logger log = LoggerFactory.getLogger(ConfigProcessors.class);
+ private static final Logger log = LoggerFactory.getLogger(ConfigProcessors.class);
private static final String FIELD = "field";
private static final String OLD_FIELD = "oldField";
private static final String FILTER = "filter";
private static final String VALUE = "value";
private static final String REGEX = "\\[\\]";
private static final String OBJECT_NOT_FOUND = "ObjectNotFound";
-
- public ConfigProcessors(JSONObject eventJson)
- {
- event = eventJson;
- }
-
- /**
- *
- */
- public void getValue(JSONObject J)
- {
- //log.info("addAttribute");
- final String field = J.getString(FIELD);
- //final String value = J.getString(VALUE);
- final JSONObject filter = J.optJSONObject(FILTER);
- if (filter == null || isFilterMet(filter))
- {
- //log.info("field ==" + field);
- //log.info("value ==" + value);
- getEventObjectVal(field);
- }
- else
- log.info("Filter not met");
- }
-
- /**
- *
- */
- public void setValue(JSONObject J)
- {
- //log.info("addAttribute");
- final String field = J.getString(FIELD);
- final String value = J.getString(VALUE);
- final JSONObject filter = J.optJSONObject(FILTER);
- if (filter == null || isFilterMet(filter))
- {
- //log.info("field ==" + field);
- //log.info("value ==" + value);
- setEventObjectVal(field, value);
- }
- else
- log.info("Filter not met");
- }
-
- /**
- *
- */
- public String evaluate(String str)
- {
- String value = str;
- if (str.startsWith("$"))
- {
- value = (String) getEventObjectVal(str.substring(1));
-
- }
- return value;
- }
-
- /**
- * { "functionName":"suppressEvent",
- "args":{}
+ private static final String FILTER_NOT_MET = "Filter not met";
+ private static final String COMP_FALSE = "==false";
+
+ private final JSONObject event;
+
+ public ConfigProcessors(JSONObject eventJson) {
+ event = eventJson;
+ }
+
+ public void getValue(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+
+ if (filter == null || isFilterMet(filter)) {
+ getEventObjectVal(field);
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+
+ public void setValue(JSONObject jsonObject) {
+ final String field = jsonObject.getString(FIELD);
+ final String value = jsonObject.getString(VALUE);
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ if (filter == null || isFilterMet(filter)) {
+ setEventObjectVal(field, value);
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+
+
+ private String evaluate(String str) {
+ String value = str;
+ if (str.startsWith("$")) {
+ value = (String) getEventObjectVal(str.substring(1));
+
+ }
+ return value;
+ }
+
+
+ public void suppressEvent(JSONObject jsonObject) {
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+
+ if (filter == null || isFilterMet(filter)) {
+ setEventObjectVal("suppressEvent", "true");
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+
+ public void addAttribute(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ final String value = evaluate(jsonObject.getString(VALUE));
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ final String fieldType = jsonObject.optString("fieldType", "string").toLowerCase();
+
+ if (filter == null || isFilterMet(filter)) {
+ setEventObjectVal(field, value, fieldType);
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+
+ public void updateAttribute(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ final String value = evaluate(jsonObject.getString(VALUE));
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ if (filter == null || isFilterMet(filter)) {
+ setEventObjectVal(field, value);
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+
+ public void removeAttribute(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+
+ if (filter == null || isFilterMet(filter)) {
+ removeEventKey(field);
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+
+ private void renameArrayInArray(JSONObject jsonObject) // map
+ {
+ log.info("renameArrayInArray");
+ final String field = jsonObject.getString(FIELD);
+ final String oldField = jsonObject.getString(OLD_FIELD);
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+
+ if (filter == null || isFilterMet(filter)) {
+
+ final String[] fsplit = field.split(REGEX, field.length());
+ final String[] oldfsplit = oldField.split(REGEX, oldField.length());
+
+ final String oldValue = getEventObjectVal(oldfsplit[0]).toString();
+ if (!oldValue.equals(OBJECT_NOT_FOUND)) {
+ final String oldArrayName = oldfsplit[1].substring(1);
+ final String newArrayName = fsplit[1].substring(1);
+ final String value = oldValue.replaceAll(oldArrayName, newArrayName);
+
+ log.info("oldValue ==" + oldValue);
+ log.info("value ==" + value);
+ JSONArray ja = new JSONArray(value);
+ removeEventKey(oldfsplit[0]);
+ setEventObjectVal(fsplit[0], ja);
}
- */
- public void suppressEvent(JSONObject J)
- {
- //log.info("addAttribute");
- final JSONObject filter = J.optJSONObject(FILTER);
-
- if (filter == null || isFilterMet(filter))
- {
- //log.info("field ==" + field);
- //log.info("value ==" + value);
- setEventObjectVal("suppressEvent", "true");
- }
- else
- log.info("Filter not met");
- }
-
- /**
- *
- */
- public void addAttribute(JSONObject J)
- {
- //log.info("addAttribute begin");
- final String field = J.getString(FIELD);
- final String value = evaluate(J.getString(VALUE));
- final JSONObject filter = J.optJSONObject(FILTER);
- final String fieldType = J.optString("fieldType", "string").toLowerCase();
-
- if (filter == null || isFilterMet(filter))
- {
- //log.info("field ==" + field);
- //log.info("value ==" + value);
- setEventObjectVal(field, value, fieldType);
- }
- else
- log.info("Filter not met");
- //log.info("addAttribute End");
- }
-
- /**
- *
- */
- public void updateAttribute(JSONObject J)
- {
- //log.info("updateAttribute");
- final String field = J.getString(FIELD);
- final String value = evaluate(J.getString(VALUE));
- final JSONObject filter = J.optJSONObject(FILTER);
- if (filter == null || isFilterMet(filter))
- {
- //log.info("field ==" + field);
- //log.info("value ==" + value);
- setEventObjectVal(field, value);
- }
- else
- log.info("Filter not met");
- }
-
- /**
- *
- */
- public void removeAttribute(JSONObject J)
- {
- //log.info("removeAttribute");
- final String field = J.getString(FIELD);
- final JSONObject filter = J.optJSONObject(FILTER);
-
- if (filter == null || isFilterMet(filter))
- {
- removeEventKey(field);
- }
- else
- log.info("Filter not met");
- }
-
- /**
- *
- */
- public void renameArrayInArray(JSONObject J) //map
- {
- log.info("renameArrayInArray");
- final String field = J.getString(FIELD);
- final String oldField = J.getString(OLD_FIELD);
- final JSONObject filter = J.optJSONObject(FILTER);
- //String value = "";
- if (filter == null || isFilterMet(filter))
- {
- //log.info("field ==" + field);
- final String[] fsplit = field.split(REGEX, field.length());
- final String[] oldfsplit = oldField.split(REGEX, oldField.length());
- /*for (int i=0; i< oldfsplit.length; i++ )
- {
- log.info( "renameArrayInArray " + i + " ==" + oldfsplit[i]);
- }*/
-
- final String oldValue = getEventObjectVal(oldfsplit[0]).toString();
- if (!oldValue.equals(OBJECT_NOT_FOUND)){
- final String oldArrayName = oldfsplit[1].substring(1);
- final String newArrayName = fsplit[1].substring(1);
- final String value = oldValue.replaceAll(oldArrayName, newArrayName);
- //log.info("oldArrayName ==" + oldArrayName);
- //log.info("newArrayName ==" + newArrayName);
- log.info("oldValue ==" + oldValue);
- log.info("value ==" + value);
- JSONArray ja = new JSONArray(value);
- removeEventKey(oldfsplit[0]);
- setEventObjectVal(fsplit[0], ja);
- }
- }
- else
- log.info("Filter not met");
- }
-
- /**
- *
- */
- public void map(JSONObject J)
- {
- //log.info("mapAttribute");
- final String field = J.getString(FIELD);
- if (field.contains("[]"))
- {
- if (field.matches(".*\\[\\]\\..*\\[\\]"))
- renameArrayInArray(J);
- else
- mapToJArray(J);
- }
- else
- mapAttribute(J);
- }
-
- /**
- *
- */
- public String performOperation(String operation, String value)
- {
- log.info("performOperation");
- if (operation != null)
- {
- if (operation.equals("convertMBtoKB"))
- {
- float kbValue = Float.parseFloat(value) * 1024;
- value = String.valueOf(kbValue);
- }
- }
- return value;
- }
-
- /**
- *
- */
- //public void mapAttributeToArrayAttribute(JSONObject J)
- public void mapAttribute(JSONObject J)
- {
- //log.info("mapAttribute");
- final String field = J.getString(FIELD);
- final String oldField = J.getString(OLD_FIELD);
- final JSONObject filter = J.optJSONObject(FILTER);
- final String operation = J.optString("operation");
- String value = "";
- if (filter == null || isFilterMet(filter))
- {
- //log.info("field ==" + field);
-
- value = getEventObjectVal(oldField).toString();
- if (!value.equals(OBJECT_NOT_FOUND))
- {
- if (operation != null && !operation.equals(""))
- value = performOperation(operation, value);
- //log.info("value ==" + value);
- setEventObjectVal(field, value);
-
- removeEventKey(oldField);
- }
- }
- else
- log.info("Filter not met");
- }
-
- /**
- *
- */
- public void mapToJArray(JSONObject J)
- {
- log.info("mapToJArray");
- String field = J.getString(FIELD);
- String oldField = J.getString(OLD_FIELD);
- final JSONObject filter = J.optJSONObject(FILTER);
- final JSONObject attrMap = J.optJSONObject("attrMap");
- oldField = oldField.replaceAll(REGEX, "");
- field = field.replaceAll(REGEX, "");
-
- //log.info("oldField ==" + field);
- if (filter == null || isFilterMet(filter))
- {
- //log.info("oldField ==" + field);
- String value = getEventObjectVal(oldField).toString();
- if (!value.equals(OBJECT_NOT_FOUND))
- {
- log.info("old value ==" + value.toString());
- //update old value based on attrMap
- if (attrMap != null)
- {
- //loop thru attrMap and update attribute name to new name
- for (String key : attrMap.keySet())
- {
- //log.info("attr key==" + key + " value==" + attrMap.getString(key));
- value = value.replaceAll(key, attrMap.getString(key));
- }
- }
-
- log.info("new value ==" + value);
- char c = value.charAt(0);
- if (c != '[')
- {
- //oldfield is JsonObject
- JSONObject valueJO = new JSONObject(value);
- // if the array already exists
-
- String existingValue = getEventObjectVal(field).toString();
- if (!existingValue.equals(OBJECT_NOT_FOUND))
- {
- JSONArray ja = new JSONArray(existingValue);
- JSONObject jo = ja.optJSONObject(0);
- if (jo != null)
- {
- for (String key : valueJO.keySet())
- {
- jo.put(key, valueJO.get(key));
-
- }
- ja.put(0, jo);
- //log.info("jarray== " + ja.toString());
- setEventObjectVal(field,ja);
- }
- }
- else //if new array
- setEventObjectVal(field + "[0]", new JSONObject(value), "JArray");
- }
- else //oldfield is jsonArray
- setEventObjectVal(field, new JSONArray(value));
-
- removeEventKey(oldField);
- }
- }
- else
- log.info("Filter not met");
- }
-
- /**
- * example -
- {
- "functionName": "concatenateValue",
- "args":{
- "filter": {"event.commonEventHeader.event":"heartbeat"},
- FIELD:"event.commonEventHeader.eventName",
- "concatenate": ["event.commonEventHeader.domain","event.commonEventHeader.eventType","event.commonEventHeader.alarmCondition"],
- "delimiter":"_"
- }
- }
- **/
- public void concatenateValue(JSONObject J)
- {
- //log.info("concatenateValue");
- final String field = J.getString(FIELD);
- final String delimiter = J.getString("delimiter");
- final JSONArray values = J.getJSONArray("concatenate");
- final JSONObject filter = J.optJSONObject(FILTER);
- if (filter == null || isFilterMet(filter))
- {
- String value = "";
- for (int i=0; i < values.length(); i++)
- {
- //log.info(values.getString(i));
- String tempVal = evaluate(values.getString(i));
- if (!tempVal.equals(OBJECT_NOT_FOUND))
- {
- if (i ==0)
- value = value + tempVal;
- else
- value = value + delimiter + tempVal;
- }
- }
- //log.info("value ==" + value);
- setEventObjectVal(field, value);
- }
- else
- log.info("Filter not met");
- }
-
- public void subtractValue(JSONObject J)
- {
- //log.info("concatenateValue");
- final String field = J.getString(FIELD);
- final JSONArray values = J.getJSONArray("subtract");
- final JSONObject filter = J.optJSONObject(FILTER);
- if (filter == null || isFilterMet(filter))
- {
- float value = 0;
- for (int i=0; i < values.length(); i++)
- {
- log.info(values.getString(i));
- String tempVal = evaluate(values.getString(i));
- log.info("tempVal==" + tempVal);
- if (!tempVal.equals(OBJECT_NOT_FOUND))
- {
- if (i ==0)
- value = value + Float.valueOf(tempVal);
- else
- value = value - Float.valueOf(tempVal);
- }
- }
- log.info("value ==" + value );
- setEventObjectVal(field, value, "number");
- }
- else
- log.info("Filter not met");
- }
-
- /**
- *
- */
- private void removeEventKey(String field)
- {
- String[] keySet = field.split("\\.",field.length());
- JSONObject keySeries = event;
- for (int i=0; i<(keySet.length -1); i++ )
- {
- //log.info( i + " ==" + keySet[i]);
- keySeries = keySeries.getJSONObject(keySet[i]);
- }
- //log.info(keySet[keySet.length -1]);
-
- keySeries.remove(keySet[keySet.length -1]);
-
- }
-
- /**
- *
- */
- private boolean checkFilter(JSONObject jo, String key, String logicKey)
- {
- String filterValue = jo.getString(key);
- boolean retVal = true;
-
- if(filterValue.contains(":"))
- {
- String[] splitVal = filterValue.split(":");
- //log.info(splitVal[0] + " " + splitVal[1]);
- if (splitVal[0].equals("matches"))
- {
- if (logicKey.equals("not"))
- {
- //log.info("not");
- //log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "split1==" + splitVal[1]);
- if (getEventObjectVal(key).toString().matches(splitVal[1]))
- {
- log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false");
- return false;
- }
- }
- else
- {
- if (!(getEventObjectVal(key).toString().matches(splitVal[1])))
- {
- log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false");
- return false;
- }
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+
+ public void map(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ if (field.contains("[]")) {
+ if (field.matches(".*\\[\\]\\..*\\[\\]"))
+ renameArrayInArray(jsonObject);
+ else
+ mapToJArray(jsonObject);
+ } else
+ mapAttribute(jsonObject);
+ }
+
+ private String performOperation(String operation, String value) {
+ log.info("performOperation");
+ if ("convertMBtoKB".equals(operation)) {
+ float kbValue = Float.parseFloat(value) * 1024;
+ value = String.valueOf(kbValue);
+ }
+ return value;
+ }
+
+
+ public void mapAttribute(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ final String oldField = jsonObject.getString(OLD_FIELD);
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ final String operation = jsonObject.optString("operation");
+ String value;
+ if (filter == null || isFilterMet(filter)) {
+
+ value = getEventObjectVal(oldField).toString();
+ if (!value.equals(OBJECT_NOT_FOUND)) {
+ if (operation != null && !operation.isEmpty())
+ value = performOperation(operation, value);
+
+ setEventObjectVal(field, value);
+
+ removeEventKey(oldField);
+ }
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+
+ private void mapToJArray(JSONObject jsonObject) {
+ log.info("mapToJArray");
+ String field = jsonObject.getString(FIELD);
+ String oldField = jsonObject.getString(OLD_FIELD);
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ final JSONObject attrMap = jsonObject.optJSONObject("attrMap");
+ oldField = oldField.replaceAll(REGEX, "");
+ field = field.replaceAll(REGEX, "");
+
+ if (filter == null || isFilterMet(filter)) {
+
+ String value = getEventObjectVal(oldField).toString();
+ if (!value.equals(OBJECT_NOT_FOUND)) {
+ log.info("old value ==" + value);
+ // update old value based on attrMap
+ if (attrMap != null) {
+ // loop thru attrMap and update attribute name to new name
+ for (String key : attrMap.keySet()) {
+ value = value.replaceAll(key, attrMap.getString(key));
}
-
}
- if (splitVal[0].equals("contains"))
- {
- if (logicKey.equals("not"))
- {
- //log.info("not");
- //log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "split1==" + splitVal[1]);
- if (getEventObjectVal(key).toString().contains(splitVal[1]))
- {
- log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false");
- return false;
+
+ log.info("new value ==" + value);
+ char c = value.charAt(0);
+ if (c != '[') {
+ // oldfield is JsonObject
+ JSONObject valueJO = new JSONObject(value);
+ // if the array already exists
+ String existingValue = getEventObjectVal(field).toString();
+ if (!existingValue.equals(OBJECT_NOT_FOUND)) {
+ JSONArray ja = new JSONArray(existingValue);
+ JSONObject jo = ja.optJSONObject(0);
+ if (jo != null) {
+ for (String key : valueJO.keySet()) {
+ jo.put(key, valueJO.get(key));
+
+ }
+ ja.put(0, jo);
+
+ setEventObjectVal(field, ja);
}
- }
+ } else // if new array
+ setEventObjectVal(field + "[0]", new JSONObject(value), "JArray");
+ } else // oldfield is jsonArray
+ setEventObjectVal(field, new JSONArray(value));
+
+ removeEventKey(oldField);
+ }
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+ /**
+ * example - { "functionName": "concatenateValue", "args":{ "filter":
+ * {"event.commonEventHeader.event":"heartbeat"},
+ * FIELD:"event.commonEventHeader.eventName", "concatenate":
+ * ["event.commonEventHeader.domain","event.commonEventHeader.eventType","event.commonEventHeader.alarmCondition"],
+ * "delimiter":"_" } }
+ **/
+ public void concatenateValue(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ final String delimiter = jsonObject.getString("delimiter");
+ final JSONArray values = jsonObject.getJSONArray("concatenate");
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ if (filter == null || isFilterMet(filter)) {
+ StringBuilder value = new StringBuilder();
+ for (int i = 0; i < values.length(); i++) {
+
+ String tempVal = evaluate(values.getString(i));
+ if (!tempVal.equals(OBJECT_NOT_FOUND)) {
+ if (i == 0)
+ value.append(tempVal);
else
- {
- if (!(getEventObjectVal(key).toString().contains(splitVal[1])))
- {
- log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false");
- return false;
- }
+ value.append(delimiter).append(tempVal);
+ }
+ }
+
+ setEventObjectVal(field, value.toString());
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+ public void subtractValue(JSONObject jsonObject) {
+
+ final String field = jsonObject.getString(FIELD);
+ final JSONArray values = jsonObject.getJSONArray("subtract");
+ final JSONObject filter = jsonObject.optJSONObject(FILTER);
+ if (filter == null || isFilterMet(filter)) {
+ float value = 0;
+ for (int i = 0; i < values.length(); i++) {
+ log.info(values.getString(i));
+ String tempVal = evaluate(values.getString(i));
+ log.info("tempVal==" + tempVal);
+ if (!tempVal.equals(OBJECT_NOT_FOUND)) {
+ if (i == 0)
+ value = value + Float.valueOf(tempVal);
+ else
+ value = value - Float.valueOf(tempVal);
+ }
+ }
+ log.info("value ==" + value);
+ setEventObjectVal(field, value, "number");
+ } else
+ log.info(FILTER_NOT_MET);
+ }
+
+
+ private void removeEventKey(String field) {
+ String[] keySet = field.split("\\.", field.length());
+ JSONObject keySeries = event;
+ for (int i = 0; i < (keySet.length - 1); i++) {
+
+ keySeries = keySeries.getJSONObject(keySet[i]);
+ }
+
+ keySeries.remove(keySet[keySet.length - 1]);
+ }
+
+
+ private boolean checkFilter(JSONObject jo, String key, String logicKey) {
+ String filterValue = jo.getString(key);
+ if (filterValue.contains(":")) {
+ String[] splitVal = filterValue.split(":");
+ if ("matches".equals(splitVal[0])) {
+ if ("not".equals(logicKey)) {
+ if (getEventObjectVal(key).toString().matches(splitVal[1])) {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
+ return false;
+ }
+ } else {
+ if (!(getEventObjectVal(key).toString().matches(splitVal[1]))) {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
+ return false;
}
-
}
+
}
- else
- {
- if (logicKey.equals("not"))
- {
- if(getEventObjectVal(key).toString().equals(filterValue))
- {
- log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false");
+ if ("contains".equals(splitVal[0])) {
+ if ("not".equals(logicKey)) {
+ if (getEventObjectVal(key).toString().contains(splitVal[1])) {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
+ return false;
+ }
+ } else {
+ if (!(getEventObjectVal(key).toString().contains(splitVal[1]))) {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
return false;
}
}
- else
+
+ }
+ } else {
+ if ("not".equals(logicKey)) {
+ if (getEventObjectVal(key).toString().equals(filterValue)) {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
+ return false;
+ }
+ } else {
+ if (!(getEventObjectVal(key).toString().equals(filterValue))) {
+ log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + COMP_FALSE);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+
+ public boolean isFilterMet(JSONObject jo) {
+ for (String key : jo.keySet()) {
+ if ("not".equals(key)) {
+ JSONObject njo = jo.getJSONObject(key);
+ for (String njoKey : njo.keySet()) {
+ if (!checkFilter(njo, njoKey, key))
+ return false;
+ }
+ } else {
+ if (!checkFilter(jo, key, key))
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * returns a string or JSONObject or JSONArray
+ **/
+ public Object getEventObjectVal(String keySeriesStr) {
+ keySeriesStr = keySeriesStr.replaceAll("\\[", ".");
+ keySeriesStr = keySeriesStr.replaceAll("\\]", ".");
+ if (keySeriesStr.contains("..")) {
+ keySeriesStr = keySeriesStr.replaceAll("\\.\\.", ".");
+ }
+
+ if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() - 1)
+ keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1);
+ String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length());
+ Object keySeriesObj = event;
+ for (String aKeySet : keySet) {
+ if (keySeriesObj != null) {
+ if (keySeriesObj instanceof String) {
+
+ log.info("STRING==" + keySeriesObj);
+ } else if (keySeriesObj instanceof JSONArray) {
+ keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(aKeySet));
+
+ } else if (keySeriesObj instanceof JSONObject) {
+ keySeriesObj = ((JSONObject) keySeriesObj).opt(aKeySet);
+
+ } else {
+ log.info("unknown object==" + keySeriesObj);
+ }
+ }
+ }
+
+ if (keySeriesObj == null)
+ return OBJECT_NOT_FOUND;
+ return keySeriesObj;
+ }
+
+ public void setEventObjectVal(String keySeriesStr, Object value) {
+ setEventObjectVal(keySeriesStr, value, "string");
+ }
+
+ /**
+ * returns a string or JSONObject or JSONArray
+ **/
+ public void setEventObjectVal(String keySeriesStr, Object value, String fieldType) {
+ keySeriesStr = keySeriesStr.replaceAll("\\[", ".");
+ keySeriesStr = keySeriesStr.replaceAll("\\]", ".");
+ if (keySeriesStr.contains("..")) {
+ keySeriesStr = keySeriesStr.replaceAll("\\.\\.", ".");
+ }
+ log.info("fieldType==" + fieldType);
+
+ if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() - 1)
+ keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1);
+ String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length());
+ Object keySeriesObj = event;
+ for (int i = 0; i < (keySet.length - 1); i++) {
+
+ if (keySeriesObj instanceof JSONArray) {
+
+ if (((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])) == null) // if
+ // the
+ // object
+ // is
+ // not
+ // there
+ // then
+ // add
+ // it
{
- if(!(getEventObjectVal(key).toString().equals(filterValue)))
- {
- log.info(filterValue + "==" + key + "==" + getEventObjectVal(key) + "==false");
- return false;
- }
+ log.info("Object is null, must add it");
+ if (keySet[i + 1].matches("[0-9]*")) // if index then array
+ ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONArray());
+ else
+ ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONObject());
}
+ keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i]));
+
+ } else if (keySeriesObj instanceof JSONObject) {
+ if (((JSONObject) keySeriesObj).opt(keySet[i]) == null) // if
+ // the
+ // object
+ // is
+ // not
+ // there
+ // then
+ // add
+ // it
+ {
+ if (keySet[i + 1].matches("[0-9]*")) // if index then array
+ ((JSONObject) keySeriesObj).put(keySet[i], new JSONArray());
+ else
+ ((JSONObject) keySeriesObj).put(keySet[i], new JSONObject());
+ log.info("Object is null, must add it");
+ }
+ keySeriesObj = ((JSONObject) keySeriesObj).opt(keySet[i]);
+ } else {
+ log.info("unknown object==" + keySeriesObj);
}
- return retVal;
- }
- /**
- *
- */
- public boolean isFilterMet(JSONObject jo)
- {
- boolean retval = true;
- //log.info("Filter==" + jo.toString());
- for (String key : jo.keySet())
- {
- if (key.equals("not"))
- {
- JSONObject njo = jo.getJSONObject(key);
- for (String njoKey : njo.keySet())
- {
- //log.info(njoKey);
- retval = checkFilter(njo, njoKey, key);
- if (retval == false)
- return retval;
- }
- }
- else
- {
- //log.info(key);
- //final String filterKey = key;
- retval = checkFilter(jo, key, key);
- if (retval == false)
- return retval;
- }
- }
- return true;
- }
-
- /**
- * returns a string or JSONObject or JSONArray
- **/
- public Object getEventObjectVal(String keySeriesStr)
- {
- keySeriesStr = keySeriesStr.replaceAll("\\[", ".");
- keySeriesStr = keySeriesStr.replaceAll("\\]", ".");
- if (keySeriesStr.contains(".."))
- {
- keySeriesStr = keySeriesStr.replaceAll("\\.\\.", ".");
- }
- //log.info(Integer.toString(keySeriesStr.lastIndexOf(".")));
- //log.info(Integer.toString(keySeriesStr.length() -1));
- if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() -1 )
- keySeriesStr = keySeriesStr.substring(0,keySeriesStr.length()-1 );
- String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length());
- Object keySeriesObj = event;
- for (int i=0; i<(keySet.length); i++ )
- {
- //log.info( "getEventObject " + i + " ==" + keySet[i]);
- if (keySeriesObj != null)
- {
- if (keySeriesObj instanceof String)
- {
- //keySeriesObj = keySeriesObj.get(keySet[i]);
- log.info("STRING==" + keySeriesObj);
- }
- else if (keySeriesObj instanceof JSONArray) {
- keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i]));
- //log.info("ARRAY==" + keySeriesObj);
- }
- else if (keySeriesObj instanceof JSONObject) {
- keySeriesObj = ( (JSONObject) keySeriesObj).opt(keySet[i]);
- //log.info("JSONObject==" + keySeriesObj);
- }
- else
- {
- log.info("unknown object==" + keySeriesObj);
- }
- }
- }
-
- if (keySeriesObj == null)
- return OBJECT_NOT_FOUND;
- return keySeriesObj;
- }
-
- public void setEventObjectVal(String keySeriesStr, Object value)
- {
- setEventObjectVal(keySeriesStr, value, "string");
- }
-
- /**
- * returns a string or JSONObject or JSONArray
- **/
- public void setEventObjectVal(String keySeriesStr, Object value, String fieldType)
- {
- keySeriesStr = keySeriesStr.replaceAll("\\[", ".");
- keySeriesStr = keySeriesStr.replaceAll("\\]", ".");
- if (keySeriesStr.contains(".."))
- {
- keySeriesStr = keySeriesStr.replaceAll("\\.\\.", ".");
- }
- log.info("fieldType==" + fieldType);
- //log.info(Integer.toString(keySeriesStr.lastIndexOf(".")));
- //log.info(Integer.toString(keySeriesStr.length() -1));
- if (keySeriesStr.lastIndexOf(".") == keySeriesStr.length() -1 )
- keySeriesStr = keySeriesStr.substring(0,keySeriesStr.length()-1 );
- String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length());
- Object keySeriesObj = event;
- for (int i=0; i<(keySet.length -1); i++ )
- {
- //log.info( "setEventObject " + i + " ==" + keySet[i]);
- if (keySeriesObj instanceof JSONArray) {
- //keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i]));
- if (((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])) == null) //if the object is not there then add it
- {
- log.info("Object is null, must add it");
- if (keySet[i+1].matches("[0-9]*")) // if index then array
- ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONArray());
- else
- ((JSONArray) keySeriesObj).put(Integer.parseInt(keySet[i]), new JSONObject());
- }
- keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i]));
- //log.info("ARRAY==" + keySeriesObj);
- }
- else if (keySeriesObj instanceof JSONObject) {
- if (( (JSONObject) keySeriesObj).opt(keySet[i]) == null) //if the object is not there then add it
- {
- if (keySet[i+1].matches("[0-9]*")) // if index then array
- ((JSONObject) keySeriesObj).put(keySet[i], new JSONArray());
- else
- ((JSONObject) keySeriesObj).put(keySet[i], new JSONObject());
- log.info("Object is null, must add it");
- }
- keySeriesObj = ( (JSONObject) keySeriesObj).opt(keySet[i]);
- //log.info("JSONObject==" + keySeriesObj);
- }
- else
- {
- log.info("unknown object==" + keySeriesObj);
- }
- }
- if (fieldType.equals("number") )
- {
- DecimalFormat df = new DecimalFormat("#.0");
- if (value instanceof String)
- ((JSONObject)keySeriesObj).put(keySet[keySet.length -1], Float.valueOf(df.format(Float.valueOf((String) value))));
- else
- ((JSONObject)keySeriesObj).put(keySet[keySet.length -1], Float.valueOf(df.format(value)));
- }
- else if (fieldType.equals("integer") && value instanceof String)
- ((JSONObject)keySeriesObj).put(keySet[keySet.length -1], Integer.valueOf((String) value));
- else if (fieldType.equals("JArray"))
- ((JSONArray)keySeriesObj).put( value);
- else
- ((JSONObject)keySeriesObj).put(keySet[keySet.length -1], value);
-
- }
- private JSONObject event = new JSONObject();
-}
+ }
+ if ("number".equals(fieldType)) {
+ DecimalFormat df = new DecimalFormat("#.0");
+ if (value instanceof String)
+ ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1],
+ Float.valueOf(df.format(Float.valueOf((String) value))));
+ else
+ ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], Float.valueOf(df.format(value)));
+ } else if ("integer".equals(fieldType) && value instanceof String)
+ ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], Integer.valueOf((String) value));
+ else if ("JArray".equals(fieldType))
+ ((JSONArray) keySeriesObj).put(value);
+ else
+ ((JSONObject) keySeriesObj).put(keySet[keySet.length - 1], value);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java b/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java
deleted file mode 100644
index 10a1db47..00000000
--- a/src/main/java/org/onap/dcae/commonFunction/CustomExceptionLoader.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.commonFunction;
-
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.HashMap;
-
-import java.util.Map.Entry;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonIOException;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-
-public class CustomExceptionLoader {
-
- protected static HashMap<String, JsonArray> map = null;
- private static final Logger log = LoggerFactory.getLogger ( CustomExceptionLoader.class );
-
- //For standalone test
- //LoadMap Invoked from servletSetup
- /*
- public static void main(String[] args) {
-
- System.out.println("CustomExceptionLoader.main --> Arguments -- ExceptionConfig file: " + args[0] + "StatusCode:" + args[1]+ " Error Msg:" + args[2]);
- CommonStartup.exceptionConfig = args[0];
-
- //Read the Custom exception JSON file into map
- LoadMap();
- System.out.println("CustomExceptionLoader.main --> Map info post LoadMap:" + map);
-
- String[] str= LookupMap(args[1],args[2]);
- if (! (str==null)) {
- System.out.println("CustomExceptionLoader.main --> Return from lookup function" + str[0] + "value:" + str[1]);
- }
-
- }
- */
-
- public static void LoadMap () {
-
- map = new HashMap<String, JsonArray>();
- FileReader fr = null;
- try {
- JsonElement root = null;
- fr = new FileReader(CommonStartup.exceptionConfig);
- root = new JsonParser().parse(fr);
- JsonObject jsonObject = root.getAsJsonObject().get("code").getAsJsonObject();
-
- for (Entry<String, JsonElement> entry : jsonObject.entrySet()) {
- map.put(entry.getKey(), (JsonArray) entry.getValue());
- }
-
- log.debug("CustomExceptionLoader.LoadMap --> Map loaded - " + map);
- } catch (JsonIOException|JsonSyntaxException|FileNotFoundException e) {
- log.error("Exception in LoadMap:" + e.getMessage());
- //e.printStackTrace();
- map = null;
- }
- finally {
- if (fr != null) {
- try {
- fr.close();
- } catch (IOException e) {
- log.error("Error closing file reader stream : " +e.toString());
- map = null;
- }
- }
- }
- }
-
- public static String[] LookupMap (String error, String errormsg) {
-
- String[] retarray = null;
-
- log.debug("CustomExceptionLoader.LookupMap -->" + " HTTP StatusCode:" + error + " Msg:" + errormsg);
- try{
-
- JsonArray jarray = map.get(error);
- for (int i = 0; i < jarray.size(); i++) {
-
- JsonElement val = jarray.get(i).getAsJsonObject().get("Reason");
- JsonArray ec = (JsonArray) jarray.get(i).getAsJsonObject().get("ErrorCode");
- log.trace("CustomExceptionLoader.LookupMap Parameter -> Error msg : " + errormsg + " Reason text being matched:" + val);
- if (errormsg.contains(val.toString().replace("\"", ""))){
- log.trace("CustomExceptionLoader.LookupMap Successful! Exception matched to error message StatusCode:" + ec.get(0).toString() + "ErrorMessage:" + ec.get(1).toString());
- retarray = new String[2];
- retarray[0]=ec.get(0).toString();
- retarray[1]=ec.get(1).toString();
- return retarray;
- }
- }
-
- }
- catch (Exception e)
- {
- System.out.println(e.getMessage());
- }
-
- return retarray;
- }
-
-}
diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java
deleted file mode 100644
index b10f5882..00000000
--- a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.commonFunction;
-
-import java.io.FileNotFoundException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonIOException;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-
-public class DmaapPropertyReader {
-
- private static DmaapPropertyReader instance = null;
-
- private static final Logger log = LoggerFactory.getLogger(DmaapPropertyReader.class);
-
- public HashMap<String, String> dmaap_hash = new HashMap<String, String>();
-
- public DmaapPropertyReader(String CambriaConfigFile) {
-
- FileReader fr = null;
- try {
- JsonElement root = null;
- fr = new FileReader(CambriaConfigFile);
- root = new JsonParser().parse(fr);
-
- //Check if dmaap config is handled by legacy controller/service manager
- if (root.getAsJsonObject().has("channels")) {
- JsonArray jsonObject = (JsonArray) root.getAsJsonObject().get("channels");
-
- for (int i = 0; i < jsonObject.size(); i++) {
- log.debug("TOPIC:" + jsonObject.get(i).getAsJsonObject().get("cambria.topic") + " HOST-URL:"
- + jsonObject.get(i).getAsJsonObject().get("cambria.url") + " HOSTS:"
- + jsonObject.get(i).getAsJsonObject().get("cambria.hosts") + " PWD:"
- + jsonObject.get(i).getAsJsonObject().get("basicAuthPassword") + " USER:"
- + jsonObject.get(i).getAsJsonObject().get("basicAuthUsername") + " NAME:"
- + jsonObject.get(i).getAsJsonObject().get("name"));
-
- String convertedname = jsonObject.get(i).getAsJsonObject().get("name").toString().replace("\"", "");
- dmaap_hash.put(convertedname + ".cambria.topic",
- jsonObject.get(i).getAsJsonObject().get("cambria.topic").toString().replace("\"", ""));
-
- if (jsonObject.get(i).getAsJsonObject().get("cambria.hosts") != null) {
- dmaap_hash.put(convertedname + ".cambria.hosts",
- jsonObject.get(i).getAsJsonObject().get("cambria.hosts").toString().replace("\"", ""));
- }
- if (jsonObject.get(i).getAsJsonObject().get("cambria.url") != null) {
- dmaap_hash.put(convertedname + ".cambria.url",
- jsonObject.get(i).getAsJsonObject().get("cambria.url").toString().replace("\"", ""));
- }
- if (jsonObject.get(i).getAsJsonObject().get("basicAuthPassword") != null) {
- dmaap_hash.put(convertedname + ".basicAuthPassword", jsonObject.get(i).getAsJsonObject()
- .get("basicAuthPassword").toString().replace("\"", ""));
- }
- if (jsonObject.get(i).getAsJsonObject().get("basicAuthUsername") != null) {
- dmaap_hash.put(convertedname + ".basicAuthUsername", jsonObject.get(i).getAsJsonObject()
- .get("basicAuthUsername").toString().replace("\"", ""));
- }
-
- }
- } else {
-
- //Handing new format from controllergen2/config_binding_service
- JsonObject jsonObject = root.getAsJsonObject();
- Set<Map.Entry<String, JsonElement>> entries = jsonObject.entrySet();
-
- for (Map.Entry<String, JsonElement> entry : entries) {
-
- JsonElement topicurl = entry.getValue().getAsJsonObject().get("dmaap_info").getAsJsonObject().get("topic_url");
- String[] urlParts = dmaapUrlSplit(topicurl.toString().replace("\"", ""));
-
- String mrTopic = null;
- String mrUrl = null;
- String[] hostport = null;
- String username = null;
- String userpwd = null;
-
- try {
-
- if (null != urlParts) {
- mrUrl = urlParts[2];
-
- // DCAE internal dmaap topic convention
- if (urlParts[3].equals("events")) {
- mrTopic = urlParts[4];
- } else {
- // ONAP dmaap topic convention
- mrTopic = urlParts[3];
- hostport = mrUrl.split(":");
- }
-
- }
- } catch (NullPointerException e) {
- System.out.println("NullPointerException");
- e.getMessage();
- }
-
- if (entry.getValue().getAsJsonObject().has("aaf_username")) {
- username = entry.getValue().getAsJsonObject().get("aaf_username").toString().replace("\"", "");
- }
- if (entry.getValue().getAsJsonObject().has("aaf_password")) {
- userpwd = entry.getValue().getAsJsonObject().get("aaf_password").toString().replace("\"", "");
- }
- if (hostport == null) {
- log.debug("TOPIC:" + mrTopic + " HOST-URL:" + mrUrl + " PWD:" + userpwd + " USER:" + username);
- } else {
- log.debug("TOPIC:" + mrTopic + " HOST-URL:" + mrUrl + " HOSTS:" + hostport[0] + " PWD:"
- + userpwd + " USER:" + username + " NAME:" + entry.getKey());
- }
-
- dmaap_hash.put(entry.getKey() + ".cambria.topic", mrTopic);
-
- if (!(hostport == null)) {
- dmaap_hash.put(entry.getKey() + ".cambria.hosts", hostport[0]);
- }
-
- if (!(mrUrl == null)) {
- dmaap_hash.put(entry.getKey() + ".cambria.url", mrUrl);
- }
-
- if (!(username == null)) {
- dmaap_hash.put(entry.getKey() + ".basicAuthUsername", username);
- }
-
- if (!(userpwd == null)) {
- dmaap_hash.put(entry.getKey() + ".basicAuthPassword", userpwd);
- }
-
- }
-
- }
-
- } catch (JsonIOException | JsonSyntaxException |
-
- FileNotFoundException e1) {
- e1.printStackTrace();
- log.error("Problem loading Dmaap Channel configuration file: " + e1.toString());
- } finally {
- if (fr != null) {
- try {
- fr.close();
- } catch (IOException e) {
- log.error("Error closing file reader stream : " + e.toString());
- }
- }
- }
-
- }
-
- /***
- * Dmaap url structure pub - https://<dmaaphostname>:<port>/events/
- * <namespace>.<dmaapcluster>.<topic>, sub - https://<dmaaphostname>:
- * <port>/events/<namespace>.<dmaapcluster>.<topic>/G1/u1";
- *
- * Onap url structure pub - http://<dmaaphostname>:<port>/<unauthenticated>.
- * <topic>,
- */
-
- private String[] dmaapUrlSplit(String dmUrl) {
- String[] multUrls = dmUrl.split(",");
-
- StringBuffer newUrls = new StringBuffer();
- String urlParts[] = null;
- for (int i = 0; i < multUrls.length; i++) {
- urlParts = multUrls[i].split("/");
- if (i == 0) {
- newUrls = newUrls.append(urlParts[2]);
- } else {
- newUrls = newUrls.append(",").append(urlParts[2]);
- }
- }
- return urlParts;
- }
-
- public static synchronized DmaapPropertyReader getInstance(String ChannelConfig) {
- if (instance == null) {
- instance = new DmaapPropertyReader(ChannelConfig);
- }
- return instance;
- }
-
- public String getKeyValue(String HashKey) {
- return this.dmaap_hash.get(HashKey);
- }
-}
diff --git a/src/main/java/org/onap/dcae/commonFunction/Event.java b/src/main/java/org/onap/dcae/commonFunction/Event.java
new file mode 100644
index 00000000..faae2451
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/Event.java
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction;
+
+import com.google.gson.JsonObject;
+
+import java.util.List;
+
+class Event {
+ final JsonObject filter;
+ final List<Processor> processors;
+
+ Event(JsonObject filter, List<Processor> processors) {
+ this.filter = filter;
+ this.processors = processors;
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
index 2bc5e45b..a57ea3f0 100644
--- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
+++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
@@ -1,193 +1,172 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.commonFunction;
-
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonParser;
-import org.json.JSONArray;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileReader;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.TimeZone;
-
-public class EventProcessor implements Runnable {
-
- private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
- private static final String EVENT_LITERAL = "event";
- private static final String COMMON_EVENT_HEADER = "commonEventHeader";
-
- private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>();
- public JSONObject event;
-
- public EventProcessor() {
- log.debug("EventProcessor: Default Constructor");
-
- String[] list = CommonStartup.streamid.split("\\|");
- for (String aList : list) {
- String domain = aList.split("=")[0];
- //String streamIdList[] = list[i].split("=")[1].split(",");
- String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(",");
-
- log.debug(String.format("Domain: %s streamIdList:%s", domain,
- Arrays.toString(streamIdList)));
- streamid_hash.put(domain, streamIdList);
- }
-
- }
-
- @Override
- public void run() {
-
- try {
-
- event = CommonStartup.fProcessingInputQueue.take();
- log.info("EventProcessor\tRemoving element: " + event);
-
- //EventPublisher Ep=new EventPublisher();
- while (event != null) {
- // As long as the producer is running we remove elements from the queue.
-
- //UUID uuid = UUID.fromString(event.get("VESuniqueId").toString());
- String uuid = event.get("VESuniqueId").toString();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
-
- log.debug("event.VESuniqueId" + event.get("VESuniqueId")
- + "event.commonEventHeader.domain:" + event.getJSONObject(EVENT_LITERAL)
- .getJSONObject(COMMON_EVENT_HEADER).getString("domain"));
- String[] streamIdList = streamid_hash.get(
- event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER)
- .getString("domain"));
- log.debug("streamIdList:" + streamIdList);
-
- if (streamIdList.length == 0) {
- log.error("No StreamID defined for publish - Message dropped" + event);
- } else {
- for (String aStreamIdList : streamIdList) {
- log.info("Invoking publisher for streamId:" + aStreamIdList);
- this.overrideEvent();
- EventPublisher.getInstance(aStreamIdList).sendEvent(event);
-
- }
- }
- log.debug("Message published" + event);
- event = CommonStartup.fProcessingInputQueue.take();
- // log.info("EventProcessor\tRemoving element: " + this.queue.remove());
- }
- } catch (InterruptedException e) {
- log.error("EventProcessor InterruptedException" + e.getMessage());
- }
-
- }
-
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- public void overrideEvent() {
- //Set collector timestamp in event payload before publish
- final Date currentTime = new Date();
- final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
- sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-
- /*JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)));
- JSONObject additionalParameter = new JSONObject().put("additionalParameters",additionalParametersarray );
- JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader");
- commonEventHeaderkey.put("internalHeaderFields", additionalParameter);*/
-
-
-/* "event": {
- "commonEventHeader": {
- "internalHeaderFields": {
- "collectorTimeStamp": "Fri, 04 21 2017 04:11:52 GMT"
- },
-*/
-
- //JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)));
- JSONObject collectorTimeStamp = new JSONObject()
- .put("collectorTimeStamp", sdf.format(currentTime));
- JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL)
- .getJSONObject(COMMON_EVENT_HEADER);
- commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
- event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey);
-
- if (CommonStartup.eventTransformFlag == 1) {
- // read the mapping json file
- final JsonParser parser = new JsonParser();
- try {
- final JsonArray jo = (JsonArray) parser
- .parse(new FileReader("./etc/eventTransform.json"));
- log.info("parse eventTransform.json");
- // now convert to org.json
- final String jsonText = jo.toString();
- final JSONArray topLevel = new JSONArray(jsonText);
- //log.info("topLevel == " + topLevel);
-
- Class[] paramJSONObject = new Class[1];
- paramJSONObject[0] = JSONObject.class;
- //load VESProcessors class at runtime
- Class cls = Class.forName("org.onap.dcae.commonFunction.ConfigProcessors");
- Constructor constr = cls.getConstructor(paramJSONObject);
- Object obj = constr.newInstance(event);
-
- for (int j = 0; j < topLevel.length(); j++) {
- JSONObject filterObj = topLevel.getJSONObject(j).getJSONObject("filter");
- Method method = cls.getDeclaredMethod("isFilterMet", paramJSONObject);
- boolean filterMet = (boolean) method.invoke(obj, filterObj);
- if (filterMet) {
- final JSONArray processors = topLevel.getJSONObject(j)
- .getJSONArray("processors");
-
- //call the processor method
- for (int i = 0; i < processors.length(); i++) {
- final JSONObject processorList = processors.getJSONObject(i);
- final String functionName = processorList.getString("functionName");
- final JSONObject args = processorList.getJSONObject("args");
- //final JSONObject filter = processorList.getJSONObject("filter");
-
- log.info(String.format("functionName==%s | args==%s", functionName,
- args));
- //reflect method call
- method = cls.getDeclaredMethod(functionName, paramJSONObject);
- method.invoke(obj, args);
- }
- }
- }
-
- } catch (Exception e) {
-
- log.error("EventProcessor Exception" + e.getMessage() + e);
- log.error("EventProcessor Exception" + e.getCause());
- }
- }
- log.debug("Modified event:" + event);
-
- }
-}
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction;
+
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import org.json.JSONObject;
+import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileReader;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class EventProcessor implements Runnable {
+
+ private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
+ private static final String EVENT_LITERAL = "event";
+ private static final String COMMON_EVENT_HEADER = "commonEventHeader";
+ static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType();
+ private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
+
+ static Map<String, String[]> streamidHash = new HashMap<>();
+ public JSONObject event;
+ private EventPublisher eventPublisher;
+
+ public EventProcessor(EventPublisher eventPublisher) {
+ this.eventPublisher = eventPublisher;
+ streamidHash = CommonStartup.streamID.toJavaMap();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ event = CommonStartup.fProcessingInputQueue.take();
+ // As long as the producer is running we remove elements from
+ // the queue.
+ log.info("QueueSize:" + CommonStartup.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
+
+ String uuid = event.get("VESuniqueId").toString();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+
+ String domain = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain");
+ log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + domain);
+ String[] streamIdList = streamidHash.get(domain);
+ log.debug("streamIdList:" + Arrays.toString(streamIdList));
+
+ if (streamIdList.length == 0) {
+ log.error("No StreamID defined for publish - Message dropped" + event);
+ } else {
+ sendEventsToStreams(streamIdList);
+ }
+ log.debug("Message published" + event);
+ }
+ } catch (InterruptedException e) {
+ log.error("EventProcessor InterruptedException" + e.getMessage());
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void overrideEvent() {
+ // Set collector timestamp in event payload before publish
+ addCurrentTimeToEvent(event);
+
+ if (CommonStartup.eventTransformFlag) {
+ // read the mapping json file
+ try (FileReader fr = new FileReader("./etc/eventTransform.json")) {
+ log.info("parse eventTransform.json");
+ List<Event> events = new Gson().fromJson(fr, EVENT_LIST_TYPE);
+ parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(event)));
+ } catch (IOException e) {
+ log.error("Couldn't find file ./etc/eventTransform.json" + e.toString());
+ }
+ }
+ // Remove VESversion from event. This field is for internal use and must be removed after use.
+ if (event.has("VESversion"))
+ event.remove("VESversion");
+
+ log.debug("Modified event:" + event);
+ }
+
+ private void sendEventsToStreams(String[] streamIdList) {
+ for (String aStreamIdList : streamIdList) {
+ log.info("Invoking publisher for streamId:" + aStreamIdList);
+ this.overrideEvent();
+ eventPublisher.sendEvent(event, aStreamIdList);
+ }
+ }
+
+ private void addCurrentTimeToEvent(JSONObject event) {
+ final Date currentTime = new Date();
+ JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime));
+ JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER);
+ commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
+ event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey);
+ }
+
+ void parseEventsJson(List<Event> eventsTransform, ConfigProcessorAdapter configProcessorAdapter) {
+ // load VESProcessors class at runtime
+ for (Event eventTransform : eventsTransform) {
+ JSONObject filterObj = new JSONObject(eventTransform.filter.toString());
+ if (configProcessorAdapter.isFilterMet(filterObj)) {
+ callProcessorsMethod(configProcessorAdapter, eventTransform.processors);
+ }
+ }
+ }
+
+ private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List<Processor> processors) {
+ // call the processor method
+ for (Processor processor : processors) {
+ final String functionName = processor.functionName;
+ final JSONObject args = new JSONObject(processor.args.toString());
+
+ log.info(String.format("functionName==%s | args==%s", functionName, args));
+ // reflect method call
+ try {
+ configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args);
+ } catch (ReflectiveOperationException e) {
+ log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause());
+ }
+ }
+ }
+
+ static class ConfigProcessorAdapter {
+ private final ConfigProcessors configProcessors;
+
+ ConfigProcessorAdapter(ConfigProcessors configProcessors) {
+ this.configProcessors = configProcessors;
+ }
+
+ boolean isFilterMet(JSONObject parameter) {
+ return configProcessors.isFilterMet(parameter);
+ }
+
+ void runConfigProcessorFunctionByName(String functionName, JSONObject parameter) throws ReflectiveOperationException {
+ Method method = configProcessors.getClass().getDeclaredMethod(functionName, parameter.getClass());
+ method.invoke(configProcessors, parameter);
+ }
+ }
+}
+
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/EventPublisher.java
deleted file mode 100644
index d76299df..00000000
--- a/src/main/java/org/onap/dcae/commonFunction/EventPublisher.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.commonFunction;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-
-public class EventPublisher {
-
- private static final String VES_UNIQUE_ID = "VESuniqueId";
- private static EventPublisher instance;
- private static CambriaBatchingPublisher pub;
-
- private String streamid = "";
- private String ueburl = "";
- private String topic = "";
- private String authuser = "";
- private String authpwd = "";
-
- private static Logger log = LoggerFactory.getLogger(EventPublisher.class);
-
-
- private EventPublisher(String newstreamid) {
-
- streamid = newstreamid;
- try {
- ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
- .get(streamid + ".cambria.url");
-
- if (ueburl == null) {
- ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
- .get(streamid + ".cambria.hosts");
- }
- topic = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile)
- .getKeyValue(streamid + ".cambria.topic");
- authuser = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile)
- .getKeyValue(streamid + ".basicAuthUsername");
-
- if (authuser != null) {
- authpwd = DmaapPropertyReader
- .getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
- .get(streamid + ".basicAuthPassword");
- }
- } catch (Exception e) {
- log.error("CambriaClientBuilders connection reader exception : " + e.getMessage());
-
- }
-
- }
-
-
- /**
- * Returns event publisher
- *
- * @param streamid stream id
- * @return event publisher
- */
- public static synchronized EventPublisher getInstance(String streamid) {
- if (instance == null) {
- instance = new EventPublisher(streamid);
- }
- if (!instance.streamid.equals(streamid)) {
- instance.closePublisher();
- instance = new EventPublisher(streamid);
- }
- return instance;
-
- }
-
-
- /**
- *
- * @param event json object for event
- */
- public synchronized void sendEvent(JSONObject event) {
-
- log.debug("EventPublisher.sendEvent: instance for publish is ready");
-
- if (event.has(VES_UNIQUE_ID)) {
- String uuid = event.get(VES_UNIQUE_ID).toString();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
- log.debug("Removing VESuniqueid object from event");
- event.remove(VES_UNIQUE_ID);
- }
-
- try {
-
- if (authuser != null) {
- log.debug(String.format("URL:%sTOPIC:%sAuthUser:%sAuthpwd:%s", ueburl, topic,
- authuser, authpwd));
- pub = new CambriaClientBuilders.PublisherBuilder()
- .usingHosts(ueburl)
- .onTopic(topic)
- .usingHttps()
- .authenticatedByHttp(authuser, authpwd)
- .logSendFailuresAfter(5)
- // .logTo(log)
- // .limitBatch(100, 10)
- .build();
- } else {
-
- log.debug(String.format("URL:%sTOPIC:%s", ueburl, topic));
- pub = new CambriaClientBuilders.PublisherBuilder()
- .usingHosts(ueburl)
- .onTopic(topic)
- // .logTo(log)
- .logSendFailuresAfter(5)
- // .limitBatch(100, 10)
- .build();
-
- }
-
- int pendingMsgs = pub.send("MyPartitionKey", event.toString());
- //this.wait(2000);
-
- if (pendingMsgs > 100) {
- log.info("Pending Message Count=" + pendingMsgs);
- }
-
- closePublisher();
- log.info("pub.send invoked - no error");
- CommonStartup.oplog.info(String.format("URL:%sTOPIC:%sEvent Published:%s",
- ueburl, topic, event));
-
- } catch (IOException | GeneralSecurityException | IllegalArgumentException e) {
- log.error("Unable to publish event: {} streamid: {}. Exception: {}", event, streamid, e);
- }
- finally {
- closePublisher();
- }
-
- }
-
-
- public synchronized void closePublisher() {
-
- try {
- if (pub != null) {
- final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
- if (!stuck.isEmpty()) {
- log.error(stuck.size() + " messages unsent");
- }
- }
- } catch (InterruptedException | IOException e) {
- log.error("Caught Exception on Close event: {}", e);
- }
-
- }
-}
diff --git a/src/main/java/org/onap/dcae/commonFunction/Processor.java b/src/main/java/org/onap/dcae/commonFunction/Processor.java
new file mode 100644
index 00000000..ea79f1d3
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/Processor.java
@@ -0,0 +1,33 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction;
+
+import com.google.gson.JsonObject;
+
+class Processor {
+ final String functionName;
+ final JsonObject args;
+
+ Processor(String functionName, JsonObject args) {
+ this.functionName = functionName;
+ this.args = args;
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/VESLogger.java b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java
index 79108443..a967327e 100644
--- a/src/main/java/org/onap/dcae/commonFunction/VESLogger.java
+++ b/src/main/java/org/onap/dcae/commonFunction/VESLogger.java
@@ -33,126 +33,130 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.UUID;
-
public class VESLogger {
- public static final String VES_AGENT = "VES_AGENT";
- private static final String REQUEST_ID = "requestId";
- private static final String IP_ADDRESS ="127.0.0.1";
- private static final String HOST_NAME="localhost";
-
- public static Logger auditLog;
- public static Logger metricsLog;
- public static Logger errorLog;
- public static Logger debugLog;
-
- // Common LoggingContext
- private static LoggingContext commonLC;
- // Thread-specific LoggingContext
- private static LoggingContext threadLC;
- public LoggingContext lc;
-
-
- /**
- * Returns the common LoggingContext instance that is the base context
- * for all subsequent instances.
- *
- * @return the common LoggingContext
- */
- public static LoggingContext getCommonLoggingContext() {
- if (commonLC == null) {
- commonLC = new LoggingContextFactory.Builder().build();
- final UUID uuid = UUID.randomUUID();
-
- commonLC.put(REQUEST_ID, uuid.toString());
- }
- return commonLC;
- }
-
- /**
- * Get a logging context for the current thread that's based on the common logging context.
- * Populate the context with context-specific values.
- *
- * @param aUuid uuid for request id
- * @return a LoggingContext for the current thread
- */
- public static LoggingContext getLoggingContextForThread(UUID aUuid) {
- // note that this operation requires everything from the common context
- // to be (re)copied into the target context. That seems slow, but it actually
- // helps prevent the thread from overwriting supposedly common data. It also
- // should be fairly quick compared with the overhead of handling the actual
- // service call.
-
- threadLC = new LoggingContextFactory.Builder().
- withBaseContext(getCommonLoggingContext()).
- build();
- // Establish the request-specific UUID, as long as we are here...
- threadLC.put(REQUEST_ID, aUuid.toString());
- threadLC.put(EcompFields.kEndTimestamp, SaClock.now());
-
- return threadLC;
- }
-
- /**
- * Get a logging context for the current thread that's based on the common logging context.
- * Populate the context with context-specific values.
- *
- * @param aUuid uuid for request id
- * @return a LoggingContext for the current thread
- */
- public static LoggingContext getLoggingContextForThread(String aUuid) {
- // note that this operation requires everything from the common context
- // to be (re)copied into the target context. That seems slow, but it actually
- // helps prevent the thread from overwriting supposedly common data. It also
- // should be fairly quick compared with the overhead of handling the actual
- // service call.
-
- threadLC = new LoggingContextFactory.Builder().
- withBaseContext(getCommonLoggingContext()).
- build();
- // Establish the request-specific UUID, as long as we are here...
- threadLC.put(REQUEST_ID, aUuid);
- threadLC.put("statusCode", "COMPLETE");
- threadLC.put(EcompFields.kEndTimestamp, SaClock.now());
- return threadLC;
- }
-
- public static void setUpEcompLogging() {
-
- // Create ECOMP Logger instances
- auditLog = LoggerFactory.getLogger("com.att.ecomp.audit");
- metricsLog = LoggerFactory.getLogger("com.att.ecomp.metrics");
- debugLog = LoggerFactory.getLogger("com.att.ecomp.debug");
- errorLog = LoggerFactory.getLogger("com.att.ecomp.error");
-
- final LoggingContext lc = getCommonLoggingContext();
-
- String ipAddr = IP_ADDRESS;
- String hostname = HOST_NAME;
- try {
- final InetAddress ip = InetAddress.getLocalHost();
- hostname = ip.getCanonicalHostName();
- ipAddr = ip.getHostAddress();
- } catch (UnknownHostException x) {
- Log.debug(x.getMessage());
- }
-
- lc.put("serverName", hostname);
- lc.put("serviceName", "VESCollecor");
- lc.put("statusCode", "RUNNING");
- lc.put("targetEntity", "NULL");
- lc.put("targetServiceName", "NULL");
- lc.put("server", hostname);
- lc.put("serverIpAddress", ipAddr);
-
- // instance UUID is meaningless here, so we just create a new one each time the
- // server starts. One could argue each new instantiation of the service should
- // have a new instance ID.
- lc.put("instanceUuid", "");
- lc.put("severity", "");
- lc.put(EcompFields.kEndTimestamp, SaClock.now());
- lc.put("EndTimestamp", SaClock.now());
- lc.put("partnerName", "NA");
- }
+ public static final String VES_AGENT = "VES_AGENT";
+ public static final String REQUEST_ID = "requestId";
+ private static final String IP_ADDRESS = "127.0.0.1";
+ private static final String HOST_NAME = "localhost";
+
+ public static Logger auditLog;
+ public static Logger metricsLog;
+ public static Logger errorLog;
+ public static Logger debugLog;
+
+ // Common LoggingContext
+ private static LoggingContext commonLC;
+ // Thread-specific LoggingContext
+ private static LoggingContext threadLC;
+ public LoggingContext lc;
+
+ /**
+ * Returns the common LoggingContext instance that is the base context for
+ * all subsequent instances.
+ *
+ * @return the common LoggingContext
+ */
+ public static LoggingContext getCommonLoggingContext() {
+ if (commonLC == null) {
+ commonLC = new LoggingContextFactory.Builder().build();
+ final UUID uuid = UUID.randomUUID();
+
+ commonLC.put(REQUEST_ID, uuid.toString());
+ }
+ return commonLC;
+ }
+
+ /**
+ * Get a logging context for the current thread that's based on the common
+ * logging context. Populate the context with context-specific values.
+ *
+ * @param aUuid
+ * uuid for request id
+ * @return a LoggingContext for the current thread
+ */
+ public static LoggingContext getLoggingContextForThread(UUID aUuid) {
+ // note that this operation requires everything from the common context
+ // to be (re)copied into the target context. That seems slow, but it
+ // actually
+ // helps prevent the thread from overwriting supposedly common data. It
+ // also
+ // should be fairly quick compared with the overhead of handling the
+ // actual
+ // service call.
+
+ threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build();
+ // Establish the request-specific UUID, as long as we are here...
+ threadLC.put(REQUEST_ID, aUuid.toString());
+ threadLC.put(EcompFields.kEndTimestamp, SaClock.now());
+
+ return threadLC;
+ }
+
+ /**
+ * Get a logging context for the current thread that's based on the common
+ * logging context. Populate the context with context-specific values.
+ *
+ * @param aUuid
+ * uuid for request id
+ * @return a LoggingContext for the current thread
+ */
+ public static LoggingContext getLoggingContextForThread(String aUuid) {
+ // note that this operation requires everything from the common context
+ // to be (re)copied into the target context. That seems slow, but it
+ // actually
+ // helps prevent the thread from overwriting supposedly common data. It
+ // also
+ // should be fairly quick compared with the overhead of handling the
+ // actual
+ // service call.
+
+ threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build();
+ // Establish the request-specific UUID, as long as we are here...
+ threadLC.put(REQUEST_ID, aUuid);
+ threadLC.put("statusCode", "COMPLETE");
+ threadLC.put(EcompFields.kEndTimestamp, SaClock.now());
+ return threadLC;
+ }
+
+ public static void setUpEcompLogging() {
+
+ // Create ECOMP Logger instances
+ auditLog = LoggerFactory.getLogger("com.att.ecomp.audit");
+ metricsLog = LoggerFactory.getLogger("com.att.ecomp.metrics");
+ debugLog = LoggerFactory.getLogger("com.att.ecomp.debug");
+ errorLog = LoggerFactory.getLogger("com.att.ecomp.error");
+
+ final LoggingContext lc = getCommonLoggingContext();
+
+ String ipAddr = IP_ADDRESS;
+ String hostname = HOST_NAME;
+ try {
+ final InetAddress ip = InetAddress.getLocalHost();
+ hostname = ip.getCanonicalHostName();
+ ipAddr = ip.getHostAddress();
+ } catch (UnknownHostException x) {
+ Log.debug(x.getMessage());
+ }
+
+ lc.put("serverName", hostname);
+ lc.put("serviceName", "VESCollecor");
+ lc.put("statusCode", "RUNNING");
+ lc.put("targetEntity", "NULL");
+ lc.put("targetServiceName", "NULL");
+ lc.put("server", hostname);
+ lc.put("serverIpAddress", ipAddr);
+
+ // instance UUID is meaningless here, so we just create a new one each
+ // time the
+ // server starts. One could argue each new instantiation of the service
+ // should
+ // have a new instance ID.
+ lc.put("instanceUuid", "");
+ lc.put("severity", "");
+ lc.put(EcompFields.kEndTimestamp, SaClock.now());
+ lc.put("EndTimestamp", SaClock.now());
+ lc.put("partnerName", "NA");
+ }
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java
new file mode 100644
index 00000000..5865b12c
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java
@@ -0,0 +1,107 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import static io.vavr.API.List;
+import static io.vavr.API.Try;
+import static io.vavr.API.Tuple;
+import static io.vavr.API.unchecked;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
+
+import io.vavr.collection.List;
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import io.vavr.control.Try;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.onap.dcae.commonFunction.AnyNode;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+@SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do")
+public final class DMaaPConfigurationParser {
+
+ public static Try<Map<String, PublisherConfig>> parseToDomainMapping(Path configLocation) {
+ return readFromFile(configLocation)
+ .flatMap(DMaaPConfigurationParser::toJSON)
+ .flatMap(DMaaPConfigurationParser::toConfigMap);
+ }
+
+ private static Try<String> readFromFile(Path configLocation) {
+ return Try(() -> new String(Files.readAllBytes(configLocation)))
+ .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation)));
+ }
+
+ private static Try<AnyNode> toJSON(String config) {
+ return Try(() -> AnyNode.fromString(config))
+ .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config)));
+ }
+
+ private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) {
+ return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config))
+ .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
+ }
+
+ private static boolean usesLegacyFormat(AnyNode dMaaPConfig) {
+ return dMaaPConfig.has("channels");
+ }
+
+ private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) {
+ return root.get("channels").toList().toMap(
+ channel -> channel.get("name").toString(),
+ channel -> {
+ String destinationsStr = channel.getAsOption("cambria.url")
+ .getOrElse(channel.getAsOption("cambria.hosts").get())
+ .toString();
+ String topic = channel.get("cambria.topic").toString();
+ Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString);
+ Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString);
+ List<String> destinations = List(destinationsStr.split(","));
+ return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
+ });
+ }
+
+ private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) {
+ return root.keys().toMap(
+ channelName -> channelName,
+ channelName -> {
+ AnyNode channelConfig = root.get(channelName);
+ Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString);
+ Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString);
+ URL topicURL = unchecked(
+ () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply();
+ String[] pathSegments = topicURL.getPath().substring(1).split("/");
+ String topic = pathSegments[1];
+ String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost();
+ List<String> destinations = List(destination);
+ return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
+ });
+ }
+
+ private static PublisherConfig buildBasedOnAuth(Option<String> maybeUser, Option<String> maybePassword,
+ String topic, List<String> destinations) {
+ return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password)))
+ .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2))
+ .getOrElse(new PublisherConfig(destinations, topic));
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java
new file mode 100644
index 00000000..fd9b3ae1
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java
@@ -0,0 +1,99 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction.event.publishing;
+
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
+import io.vavr.collection.Map;
+import io.vavr.control.Try;
+import java.io.IOException;
+import org.json.JSONObject;
+import org.onap.dcae.commonFunction.VESLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+class DMaaPEventPublisher implements EventPublisher {
+ private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
+ private static final String VES_UNIQUE_ID = "VESuniqueId";
+ private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
+ private final DMaaPPublishersCache publishersCache;
+ private final Logger outputLogger;
+
+ DMaaPEventPublisher(DMaaPPublishersCache DMaaPPublishersCache,
+ Logger outputLogger) {
+ this.publishersCache = DMaaPPublishersCache;
+ this.outputLogger = outputLogger;
+ }
+
+ @Override
+ public void sendEvent(JSONObject event, String domain) {
+ clearVesUniqueIdFromEvent(event);
+ publishersCache.getPublisher(domain)
+ .onEmpty(() ->
+ log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event)))
+ .forEach(publisher -> sendEvent(event, domain, publisher));
+ }
+
+ @Override
+ public void reconfigure(Map<String, PublisherConfig> dMaaPConfig) {
+ publishersCache.reconfigure(dMaaPConfig);
+ }
+
+ private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) {
+ Try.run(() -> uncheckedSendEvent(event, domain, publisher))
+ .onFailure(exc -> closePublisher(event, domain, exc));
+ }
+
+ private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher)
+ throws IOException {
+ int pendingMsgs = publisher.send("MyPartitionKey", event.toString());
+ if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
+ log.info("Pending messages count: " + pendingMsgs);
+ }
+ String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain);
+ log.info(infoMsg);
+ outputLogger.info(infoMsg);
+ }
+
+ private void closePublisher(JSONObject event, String domain, Throwable e) {
+ log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
+ event, domain), e);
+ publishersCache.closePublisherFor(domain);
+ }
+
+ private void clearVesUniqueIdFromEvent(JSONObject event) {
+ if (event.has(VES_UNIQUE_ID)) {
+ String uuid = event.get(VES_UNIQUE_ID).toString();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+ log.debug("Removing VESuniqueid object from event");
+ event.remove(VES_UNIQUE_ID);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java
new file mode 100644
index 00000000..a7865a45
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java
@@ -0,0 +1,62 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import static io.vavr.API.Try;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
+import io.vavr.control.Try;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+final class DMaaPPublishersBuilder {
+
+ @SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do")
+ static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) {
+ return Try(() -> builder(config).build())
+ .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config)));
+ }
+
+ private static PublisherBuilder builder(PublisherConfig config) {
+ if (config.isSecured()) {
+ return authenticatedBuilder(config);
+ } else {
+ return unAuthenticatedBuilder(config);
+ }
+ }
+
+ private static PublisherBuilder authenticatedBuilder(PublisherConfig config) {
+ return unAuthenticatedBuilder(config)
+ .usingHttps()
+ .authenticatedByHttp(config.userName().get(), config.password().get());
+ }
+
+ private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) {
+ return new CambriaClientBuilders.PublisherBuilder()
+ .usingHosts(config.destinations().mkString(","))
+ .onTopic(config.topic())
+ .logSendFailuresAfter(5);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java
new file mode 100644
index 00000000..102d2774
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java
@@ -0,0 +1,124 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import static io.vavr.API.Option;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+class DMaaPPublishersCache {
+
+ private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class);
+ private final LoadingCache<String, CambriaBatchingPublisher> publishersCache;
+ private AtomicReference<Map<String, PublisherConfig>> dMaaPConfiguration;
+
+ DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) {
+ this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
+ this.publishersCache = CacheBuilder.newBuilder()
+ .removalListener(new OnPublisherRemovalListener())
+ .build(new CambriaPublishersCacheLoader());
+ }
+
+ DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader,
+ OnPublisherRemovalListener onPublisherRemovalListener,
+ Map<String, PublisherConfig> dMaaPConfiguration) {
+ this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
+ this.publishersCache = CacheBuilder.newBuilder()
+ .removalListener(onPublisherRemovalListener)
+ .build(dMaaPPublishersCacheLoader);
+ }
+
+ Option<CambriaBatchingPublisher> getPublisher(String streamID) {
+ try {
+ return Option(publishersCache.getUnchecked(streamID));
+ } catch (Exception e) {
+ log.warn("Could not create / load Cambria Publisher for streamID", e);
+ return Option.none();
+ }
+ }
+
+ void closePublisherFor(String streamId) {
+ publishersCache.invalidate(streamId);
+ }
+
+ synchronized void reconfigure(Map<String, PublisherConfig> newConfig) {
+ Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get();
+ Map<String, PublisherConfig> removedConfigurations = currentConfig
+ .filterKeys(domain -> !newConfig.containsKey(domain));
+ Map<String, PublisherConfig> changedConfigurations = newConfig
+ .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e)));
+ dMaaPConfiguration.set(newConfig);
+ removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1));
+ }
+
+ static class OnPublisherRemovalListener implements RemovalListener<String, CambriaBatchingPublisher> {
+
+ @Override
+ public void onRemoval(@Nonnull RemovalNotification<String, CambriaBatchingPublisher> notification) {
+ CambriaBatchingPublisher publisher = notification.getValue();
+ if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull
+ try {
+ int timeout = 20;
+ TimeUnit unit = TimeUnit.SECONDS;
+ java.util.List<?> stuck = publisher.close(timeout, unit);
+ if (!stuck.isEmpty()) {
+ log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', "
+ + "%s messages were dropped", stuck.size(), timeout, unit));
+ }
+ } catch (InterruptedException | IOException e) {
+ log.error("Could not close Cambria publisher, some messages might have been dropped", e);
+ }
+ }
+ }
+ }
+
+ class CambriaPublishersCacheLoader extends CacheLoader<String, CambriaBatchingPublisher> {
+
+ @Override
+ public CambriaBatchingPublisher load(@Nonnull String domain) {
+ return dMaaPConfiguration.get()
+ .get(domain)
+ .toTry(() -> new RuntimeException(
+ f("DMaaP configuration contains no configuration for domain: '%s'", domain)))
+ .flatMap(DMaaPPublishersBuilder::buildPublisher)
+ .get();
+ }
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java
new file mode 100644
index 00000000..9cd718f8
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java
@@ -0,0 +1,38 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import io.vavr.collection.Map;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+public interface EventPublisher {
+
+ static EventPublisher createPublisher(Logger outputLogger, Map<String, PublisherConfig> dMaaPConfig) {
+ return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger);
+ }
+
+ void sendEvent(JSONObject event, String domain);
+
+ void reconfigure(Map<String, PublisherConfig> dMaaPConfig);
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java
new file mode 100644
index 00000000..4a056778
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java
@@ -0,0 +1,98 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import java.util.Objects;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+public final class PublisherConfig {
+
+ private final List<String> destinations;
+ private final String topic;
+ private String userName;
+ private String password;
+
+ PublisherConfig(List<String> destinations, String topic) {
+ this.destinations = destinations;
+ this.topic = topic;
+ }
+
+ PublisherConfig(List<String> destinations, String topic, String userName, String password) {
+ this.destinations = destinations;
+ this.topic = topic;
+ this.userName = userName;
+ this.password = password;
+ }
+
+ List<String> destinations() {
+ return destinations;
+ }
+
+ String topic() {
+ return topic;
+ }
+
+ Option<String> userName() {
+ return Option.of(userName);
+ }
+
+ Option<String> password() {
+ return Option.of(password);
+ }
+
+ boolean isSecured() {
+ return userName().isDefined() && password().isDefined();
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PublisherConfig that = (PublisherConfig) o;
+ return Objects.equals(destinations, that.destinations) &&
+ Objects.equals(topic, that.topic) &&
+ Objects.equals(userName, that.userName) &&
+ Objects.equals(password, that.password);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(destinations, topic, userName, password);
+ }
+
+ @Override
+ public String toString() {
+ return "PublisherConfig{" +
+ "destinations=" + destinations +
+ ", topic='" + topic + '\'' +
+ ", userName='" + userName + '\'' +
+ ", password='" + password + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java
new file mode 100644
index 00000000..9bf3ef8c
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import static io.vavr.API.$;
+
+import io.vavr.API;
+import io.vavr.API.Match.Case;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+final class VavrUtils {
+
+ private VavrUtils() {
+ // utils aggregator
+ }
+
+ /**
+ * Shortcut for 'string interpolation'
+ */
+ static String f(String msg, Object... args) {
+ return String.format(msg, args);
+ }
+
+ /**
+ * Wrap failure with a more descriptive message of what has failed and chain original cause. Used to provide a
+ * context for errors instead of raw exception.
+ */
+ static Case<Throwable, Throwable> enhanceError(String msg) {
+ return API.Case($(), e -> new RuntimeException(msg, e));
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java b/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java
index 5ef44f5c..99e269c1 100644
--- a/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java
+++ b/src/main/java/org/onap/dcae/controller/FetchDynamicConfig.java
@@ -1,118 +1,190 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.controller;
-
-import org.json.JSONArray;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Map;
-
-public class FetchDynamicConfig {
-
- private static final Logger log = LoggerFactory.getLogger(FetchDynamicConfig.class);
-
- static String configFile = "/opt/app/KV-Configuration.json";
- static String url;
- static String retString;
-
- public FetchDynamicConfig() {
- }
-
- public static void main(String[] args) {
- Map<String, String> env = System.getenv();
- for (Map.Entry<String, String> entry : env.entrySet()) {
- log.info("%s=%s%n", entry.getKey(), entry.getValue());
- }
-
- if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")
- && env.containsKey("HOSTNAME")) {
- log.info(">>>Dynamic configuration to be fetched from ConfigBindingService");
- url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + env
- .get("CONFIG_BINDING_SERVICE");
-
- retString = executecurl(url);
- // consul return as array
- JSONTokener temp = new JSONTokener(retString);
- JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0);
-
- String urlPart1 = null;
- if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) {
- urlPart1 =
- cbsjobj.getString("ServiceAddress") + ":" + cbsjobj.getInt("ServicePort");
- }
-
- log.info("CONFIG_BINDING_SERVICE DNS RESOLVED:" + urlPart1);
- url = urlPart1 + "/service_component/" + env.get("HOSTNAME");
- retString = executecurl(url);
-
- JSONObject jsonObject = new JSONObject(new JSONTokener(retString));
- try (FileWriter file = new FileWriter(configFile)) {
- file.write(jsonObject.toString());
-
- log.info(
- "Successfully Copied JSON Object to file /opt/app/KV-Configuration.json");
- } catch (IOException e) {
- log.error(
- "Error in writing configuration into file /opt/app/KV-Configuration.json "
- + jsonObject, e);
- }
- } else {
- log.info(">>>Static configuration to be used");
- }
-
- }
-
- public static String executecurl(String url) {
-
- String[] command = {"curl", "-v", url};
- ProcessBuilder process = new ProcessBuilder(command);
- Process p;
- String result = null;
- try {
- p = process.start();
- InputStreamReader ipr = new InputStreamReader(p.getInputStream());
- BufferedReader reader = new BufferedReader(ipr);
- StringBuilder builder = new StringBuilder();
- String line;
-
- while ((line = reader.readLine()) != null) {
- builder.append(line);
- }
- result = builder.toString();
- log.info(result);
-
- reader.close();
- ipr.close();
- } catch (IOException e) {
- log.error("error", e);
- }
- return result;
-
- }
-
-}
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.controller;
+
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Map;
+
+public class FetchDynamicConfig {
+
+ private static final Logger log = LoggerFactory.getLogger(FetchDynamicConfig.class);
+
+ public static String configFile = "/opt/app/KV-Configuration.json";
+ private static String url;
+ public static String retString;
+ public static String retCBSString;
+ private static Map<String, String> env;
+
+ public FetchDynamicConfig() {
+ }
+
+ public static void main(String[] args) {
+ Boolean areEqual;
+ // Call consul api and identify the CBS Service address and port
+ getconsul();
+ // Construct and invoke CBS API to get application Configuration
+ getCBS();
+ // Verify if data has changed
+ areEqual = verifyConfigChange();
+ // If new config then write data returned into configFile for
+ // LoadDynamicConfig process
+ if (! areEqual) {
+ FetchDynamicConfig fc = new FetchDynamicConfig();
+ fc.writefile(retCBSString);
+ } else {
+ log.info("New config pull results identical - " + configFile + " NOT refreshed");
+ }
+ }
+
+ private static void getconsul() {
+
+ env = System.getenv();
+ for (Map.Entry<String, String> entry : env.entrySet()) {
+ log.info(entry.getKey() + ":" + entry.getValue());
+ }
+
+ if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")) {
+ // && env.containsKey("HOSTNAME")) {
+ log.info(">>>Dynamic configuration to be fetched from ConfigBindingService");
+ url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + env.get("CONFIG_BINDING_SERVICE");
+
+ retString = executecurl(url);
+
+ } else {
+ log.info(">>>Static configuration to be used");
+ }
+
+ }
+
+ public static boolean verifyConfigChange() {
+
+ boolean areEqual = false;
+ // Read current data
+ try {
+ File f = new File(configFile);
+ if (f.exists() && !f.isDirectory()) {
+
+ String jsonData = LoadDynamicConfig.readFile(configFile);
+ JSONObject jsonObject = new JSONObject(jsonData);
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ JsonNode tree1 = mapper.readTree(jsonObject.toString());
+ JsonNode tree2 = mapper.readTree(retCBSString);
+ areEqual = tree1.equals(tree2);
+ log.info("Comparison value:" + areEqual);
+ } else {
+ log.info("First time config file read: " + configFile);
+ }
+
+ } catch (IOException e) {
+ log.error("Comparison with new fetched data failed" + e.getMessage());
+
+ }
+
+ return areEqual;
+
+ }
+
+ public static void getCBS() {
+
+ env = System.getenv();
+ // consul return as array
+ JSONTokener temp = new JSONTokener(retString);
+ JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0);
+
+ String urlPart1 = null;
+ if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) {
+ urlPart1 = cbsjobj.getString("ServiceAddress") + ":" + cbsjobj.getInt("ServicePort");
+ }
+
+ log.info("CONFIG_BINDING_SERVICE DNS RESOLVED:" + urlPart1);
+
+ if (env.containsKey("HOSTNAME")) {
+ url = urlPart1 + "/service_component/" + env.get("HOSTNAME");
+ retCBSString = executecurl(url);
+ } else if (env.containsKey("SERVICE_NAME")) {
+ url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME");
+ retCBSString = executecurl(url);
+ } else {
+ log.error("Service name environment variable - HOSTNAME/SERVICE_NAME not found within container ");
+ }
+
+ }
+
+ public void writefile(String retCBSString) {
+ log.info("URL to fetch configuration:" + url + " Return String:" + retCBSString);
+
+ String indentedretstring = (new JSONObject(retCBSString)).toString(4);
+
+ try (FileWriter file = new FileWriter(FetchDynamicConfig.configFile)) {
+ file.write(indentedretstring);
+
+ log.info("Successfully Copied JSON Object to file " + configFile);
+ } catch (IOException e) {
+ log.error("Error in writing configuration into file " + configFile + retString + e.getMessage());
+ e.printStackTrace();
+ }
+
+ }
+
+ private static String executecurl(String url) {
+
+ String[] command = { "curl", "-v", url };
+ ProcessBuilder process = new ProcessBuilder(command);
+ Process p;
+ String result = null;
+ try {
+ p = process.start();
+ InputStreamReader ipr = new InputStreamReader(p.getInputStream());
+ BufferedReader reader = new BufferedReader(ipr);
+ StringBuilder builder = new StringBuilder();
+ String line;
+
+ while ((line = reader.readLine()) != null) {
+ builder.append(line);
+ }
+ result = builder.toString();
+ log.info(result);
+
+ reader.close();
+ ipr.close();
+ } catch (IOException e) {
+ log.error("error", e);
+ e.printStackTrace();
+ }
+ return result;
+
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java b/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java
index a28bca86..a8ecaba0 100644
--- a/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java
+++ b/src/main/java/org/onap/dcae/controller/LoadDynamicConfig.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* PROJECT
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,96 +30,100 @@ import java.io.BufferedReader;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.util.Iterator;
import java.util.Map;
-
public class LoadDynamicConfig {
- private static final Logger log = LoggerFactory.getLogger(LoadDynamicConfig.class);
-
- public String propFile = "collector.properties";
- public String configFile = "/opt/app/KV-Configuration.json";
- static String url;
- static String retString;
-
- public LoadDynamicConfig() {
-
- }
-
- public static void main(String[] args) {
- Map<String, String> env = System.getenv();
- /*for (String envName : env.keySet()) {
- System.out.format("%s=%s%n", envName, env.get(envName));
- }*/
-
- //Check again to ensure new controller deployment related config
- if (env.containsKey("CONSUL_HOST") &&
- env.containsKey("CONFIG_BINDING_SERVICE") && env.containsKey("HOSTNAME")) {
-
- try {
-
- LoadDynamicConfig lc = new LoadDynamicConfig();
- String jsonData = readFile(lc.configFile);
- JSONObject jsonObject = new JSONObject(jsonData);
-
- PropertiesConfiguration conf;
- conf = new PropertiesConfiguration(lc.propFile);
- conf.setEncoding(null);
-
- // update properties based on consul dynamic configuration
- Iterator<?> keys = jsonObject.keys();
-
- while (keys.hasNext()) {
- String key = (String) keys.next();
- // check if any configuration is related to dmaap
- // and write into dmaapconfig.json
- if (key.startsWith("streams_publishes")) {
- //VESCollector only have publish streams
- try (FileWriter file = new FileWriter("./etc/DmaapConfig.json")) {
- file.write(jsonObject.get(key).toString());
- log.info("Successfully written JSON Object to DmaapConfig.json");
- file.close();
- } catch (IOException e) {
- log.info(
- "Error in writing dmaap configuration into DmaapConfig.json",
- e);
- }
- } else {
- conf.setProperty(key, jsonObject.get(key).toString());
- }
-
- }
- conf.save();
-
- } catch (ConfigurationException e) {
- log.error(e.getLocalizedMessage(), e);
-
- }
-
- } else {
- log.info(">>>Static configuration to be used");
- }
-
- }
-
- public static String readFile(String filename) {
- String result = "";
- try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
- StringBuilder sb = new StringBuilder();
- String line = br.readLine();
- while (line != null) {
- sb.append(line);
- line = br.readLine();
- }
- result = sb.toString();
- br.close();
- } catch (Exception e) {
- log.error(e.getLocalizedMessage(), e);
- }
- return result;
- }
-
+ private static final Logger log = LoggerFactory.getLogger(LoadDynamicConfig.class);
+
+ public String propFile = "collector.properties";
+ public String configFile = "/opt/app/KV-Configuration.json";
+ public String dMaaPOutputFile = "./etc/DmaapConfig.json";
+
+ public LoadDynamicConfig() {
+
+ }
+
+ public static void main(String[] args) {
+ Map<String, String> env = System.getenv();
+
+ // Check again to ensure new controller deployment related config
+ if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")
+ && env.containsKey("HOSTNAME")) {
+
+ try {
+
+ LoadDynamicConfig lc = new LoadDynamicConfig();
+ String jsonData = readFile(lc.configFile);
+ JSONObject jsonObject = new JSONObject(jsonData);
+ lc.writeconfig(jsonObject);
+
+
+ } catch (Exception e) {
+ log.error(e.getLocalizedMessage(), e);
+ e.printStackTrace();
+
+ }
+
+ } else {
+ log.info(">>>Static configuration to be used");
+ }
+
+ }
+
+ public void writeconfig (JSONObject jsonObject)
+ {
+
+ PropertiesConfiguration conf;
+ try {
+ conf = new PropertiesConfiguration(propFile);
+
+ conf.setEncoding(null);
+
+ // update properties based on consul dynamic configuration
+ Iterator<?> keys = jsonObject.keys();
+
+ while (keys.hasNext()) {
+ String key = (String) keys.next();
+ // check if any configuration is related to dmaap
+ // and write into dmaapconfig.json
+ if (key.startsWith("streams_publishes")) {
+ // VESCollector only have publish streams
+ try (FileWriter file = new FileWriter(dMaaPOutputFile)) {
+ String indentedretstring=(new JSONObject(jsonObject.get(key).toString())).toString(4);
+ file.write(indentedretstring);
+ log.info("Successfully written JSON Object to DmaapConfig.json");
+ } catch (IOException e) {
+ log.info("Error in writing dmaap configuration into DmaapConfig.json", e);
+ }
+ } else {
+ conf.setProperty(key, jsonObject.get(key).toString());
+ }
+
+ }
+ conf.save();
+ } catch (ConfigurationException e) {
+ log.error(e.getLocalizedMessage(), e);
+ e.printStackTrace();
+ }
+ }
+
+ public static String readFile(String filename) {
+ String result = "";
+ try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
+ StringBuilder sb = new StringBuilder();
+ String line = br.readLine();
+ while (line != null) {
+ sb.append(line);
+ line = br.readLine();
+ }
+ result = sb.toString();
+ } catch (Exception e) {
+ log.error(e.getLocalizedMessage(), e);
+ e.printStackTrace();
+ }
+ return result;
+ }
}
diff --git a/src/main/java/org/onap/dcae/restapi/ApiException.java b/src/main/java/org/onap/dcae/restapi/ApiException.java
new file mode 100644
index 00000000..0f922678
--- /dev/null
+++ b/src/main/java/org/onap/dcae/restapi/ApiException.java
@@ -0,0 +1,70 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.restapi;
+
+import com.google.common.base.CaseFormat;
+import org.json.JSONObject;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+public enum ApiException {
+
+ INVALID_JSON_INPUT(ExceptionType.SERVICE_EXCEPTION, "SVC0002", "Incorrect JSON payload", 400),
+ SCHEMA_VALIDATION_FAILED(ExceptionType.SERVICE_EXCEPTION, "SVC0002", "Bad Parameter (JSON does not conform to schema)", 400),
+ INVALID_CONTENT_TYPE(ExceptionType.SERVICE_EXCEPTION, "SVC0002", "Bad Parameter (Incorrect request Content-Type)", 400),
+ UNAUTHORIZED_USER(ExceptionType.POLICY_EXCEPTION, "POL2000", "Unauthorized user", 401),
+ NO_SERVER_RESOURCES(ExceptionType.SERVICE_EXCEPTION, "SVC1000", "No server resources (internal processing queue full)", 503);
+
+ private final ExceptionType type;
+ private final String code;
+ private final String details;
+ public final int httpStatusCode;
+
+ ApiException(ExceptionType type, String code, String details, int httpStatusCode) {
+ this.type = type;
+ this.code = code;
+ this.details = details;
+ this.httpStatusCode = httpStatusCode;
+ }
+
+ public enum ExceptionType {
+ SERVICE_EXCEPTION, POLICY_EXCEPTION;
+
+ @Override
+ public String toString() {
+ return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, this.name());
+ }
+ }
+
+ public JSONObject toJSON() {
+ JSONObject exceptionTypeNode = new JSONObject();
+ exceptionTypeNode.put("messageId", code );
+ exceptionTypeNode.put("text", details);
+
+ JSONObject requestErrorNode = new JSONObject();
+ requestErrorNode.put(type.toString(), exceptionTypeNode);
+
+ JSONObject rootNode = new JSONObject();
+ rootNode.put("requestError", requestErrorNode);
+ return rootNode;
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java b/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java
index a8bfc24e..e5a29e9f 100644
--- a/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java
+++ b/src/main/java/org/onap/dcae/restapi/RestfulCollectorServlet.java
@@ -1,161 +1,127 @@
-
-/*
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.restapi;
-
-import java.io.IOException;
-import java.net.URL;
-
-import javax.servlet.ServletException;
-
-import org.apache.tomcat.util.codec.binary.Base64;
-import org.onap.dcae.commonFunction.CommonStartup;
-import org.onap.dcae.commonFunction.VESLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.apiServer.CommonServlet;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.drumlin.service.framework.DrumlinErrorHandler;
-import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext;
-import com.att.nsa.drumlin.service.framework.routing.DrumlinRequestRouter;
-import com.att.nsa.drumlin.service.framework.routing.playish.DrumlinPlayishRoutingFileSource;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.nsa.security.NsaAuthenticator;
-
-import com.att.nsa.security.authenticators.SimpleAuthenticator;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-
-
-public class RestfulCollectorServlet extends CommonServlet
-{
- String authid=null;
- String authpwd=null;
- public String authlist;
-
- public RestfulCollectorServlet ( rrNvReadable settings ) throws loadException, missingReqdSetting
- {
- super ( settings, "collector", false );
- //authid = settings.getString(CommonStartup.kSetting_authid,null);
- /*if (authid != null)
- {
- String authpwdtemp = settings.getString(CommonStartup.kSetting_authpwd,null);
- authpwd = new String(Base64.decodeBase64(authpwdtemp));
- }*/
- authlist = settings.getString(CommonStartup.kSetting_authlist,null);
- }
-
-
-
-
- /**
- * This is called once at server start. Use it to init any shared objects and setup the route mapping.
- */
- @Override
- protected void servletSetup () throws rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException
- {
- super.servletSetup ();
-
- try
- {
- // the base class provides a bunch of things like API authentication and ECOMP compliant
- // logging. The Restful Collector likely doesn't need API authentication, so for now,
- // we init the base class services with an in-memory (and empty!) config DB.
- commonServletSetup ( ConfigDbType.MEMORY );
-
- VESLogger.setUpEcompLogging();
-
- // setup the servlet routing and error handling
- final DrumlinRequestRouter drr = getRequestRouter ();
-
- // you can tell the request router what to do when a particular kind of exception is thrown.
- drr.setHandlerForException( IllegalArgumentException.class, new DrumlinErrorHandler()
- {
- @Override
- public void handle ( DrumlinRequestContext ctx, Throwable cause )
- {
- sendJsonReply ( ctx, HttpStatusCodes.k400_badRequest, cause.getMessage() );
- }
- });
-
- // load the routes from the config file
- final URL routes = findStream ( "routes.conf" );
- if ( routes == null ) throw new rrNvReadable.missingReqdSetting ( "No routing configuration." );
- final DrumlinPlayishRoutingFileSource drs = new DrumlinPlayishRoutingFileSource ( routes );
- drr.addRouteSource ( drs );
-
- if (CommonStartup.authflag > 0) {
- NsaAuthenticator<NsaSimpleApiKey> NsaAuth;
- NsaAuth = AuthlistHandler(authlist);
-
- this.getSecurityManager().addAuthenticator(NsaAuth);
- }
-
- log.info ( "Restful Collector Servlet is up." );
- }
- catch ( SecurityException e )
- {
- throw new ServletException ( e );
- }
- catch ( IOException e )
- {
- throw new ServletException ( e );
- }
- catch ( ConfigDbException e )
- {
- throw new ServletException ( e );
- }
- }
-
- public NsaAuthenticator<NsaSimpleApiKey> AuthlistHandler (String authlist)
- {
- NsaAuthenticator<NsaSimpleApiKey> NsaAuth = new SimpleAuthenticator ();
- if (authlist != null)
- {
- String authpair[] = authlist.split("\\|");
- for (String pair: authpair) {
- String lineid[] = pair.split(",");
- String listauthid = lineid[0];
- String listauthpwd = new String(Base64.decodeBase64(lineid[1]));
- ((SimpleAuthenticator) NsaAuth).add(listauthid,listauthpwd);
- }
-
- }
- else if (authid != null)
- {
- ((SimpleAuthenticator) NsaAuth).add(authid,authpwd);
- }
- else
- {
- //add a default test account
- ((SimpleAuthenticator) NsaAuth).add("admin","collectorpasscode");
- }
- return NsaAuth;
-
- }
-
-
- private static final long serialVersionUID = 1L;
- private static final Logger log = LoggerFactory.getLogger ( RestfulCollectorServlet.class );
-}
+
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.restapi;
+
+import java.io.IOException;
+import java.net.URL;
+
+import javax.servlet.ServletException;
+
+import org.apache.tomcat.util.codec.binary.Base64;
+import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.commonFunction.CommonStartup;
+import org.onap.dcae.commonFunction.VESLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.att.nsa.apiServer.CommonServlet;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.drumlin.service.framework.routing.DrumlinRequestRouter;
+import com.att.nsa.drumlin.service.framework.routing.playish.DrumlinPlayishRoutingFileSource;
+import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.att.nsa.security.NsaAuthenticator;
+
+import com.att.nsa.security.authenticators.SimpleAuthenticator;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+
+public class RestfulCollectorServlet extends CommonServlet
+{
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger log = LoggerFactory.getLogger ( RestfulCollectorServlet.class );
+
+ private static String authCredentialsList;
+
+ public RestfulCollectorServlet ( ApplicationSettings settings ) throws loadException, missingReqdSetting
+ {
+ super ( settings.torrNvReadable(), "collector", false );
+ authCredentialsList = settings.validAuthorizationCredentials();
+ }
+
+
+
+
+ /**
+ * This is called once at server start. Use it to init any shared objects and setup the route mapping.
+ */
+ @Override
+ protected void servletSetup () throws rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException
+ {
+ super.servletSetup ();
+
+ try {
+ // the base class provides a bunch of things like API authentication and ECOMP compliant
+ // logging. The Restful Collector likely doesn't need API authentication, so for now,
+ // we init the base class services with an in-memory (and empty!) config DB.
+ commonServletSetup ( ConfigDbType.MEMORY );
+
+ VESLogger.setUpEcompLogging();
+
+ // setup the servlet routing and error handling
+ final DrumlinRequestRouter drr = getRequestRouter ();
+
+ // you can tell the request router what to do when a particular kind of exception is thrown.
+ drr.setHandlerForException(IllegalArgumentException.class,
+ (ctx, cause) -> sendJsonReply (ctx, HttpStatusCodes.k400_badRequest, cause.getMessage() ));
+
+ // load the routes from the config file
+ final URL routes = findStream ( "routes.conf" );
+ if ( routes == null ) throw new rrNvReadable.missingReqdSetting ( "No routing configuration." );
+ final DrumlinPlayishRoutingFileSource drs = new DrumlinPlayishRoutingFileSource ( routes );
+ drr.addRouteSource ( drs );
+
+ if (CommonStartup.authflag) {
+ NsaAuthenticator<NsaSimpleApiKey> NsaAuth;
+ NsaAuth = createAuthenticator(authCredentialsList);
+
+ this.getSecurityManager().addAuthenticator(NsaAuth);
+ }
+
+ log.info ( "Restful Collector Servlet is up." );
+ }
+ catch ( SecurityException | IOException | ConfigDbException e ) {
+ throw new ServletException ( e );
+ }
+ }
+
+ public NsaAuthenticator<NsaSimpleApiKey> createAuthenticator(String authCredentials) {
+ NsaAuthenticator<NsaSimpleApiKey> authenticator = new SimpleAuthenticator();
+ if (authCredentials != null) {
+ String authpair[] = authCredentials.split("\\|");
+ for (String pair : authpair) {
+ String lineid[] = pair.split(",");
+ String listauthid = lineid[0];
+ String listauthpwd = new String(Base64.decodeBase64(lineid[1]));
+ ((SimpleAuthenticator) authenticator).add(listauthid, listauthpwd);
+ }
+
+ } else {
+ ((SimpleAuthenticator) authenticator).add("admin", "collectorpasscode");
+ }
+ return authenticator;
+ }
+
+}
+
diff --git a/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java b/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java
index e6b7d20c..d60e2a11 100644
--- a/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java
+++ b/src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java
@@ -1,278 +1,247 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.restapi.endpoints;
-
-import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.drumlin.service.standards.MimeTypes;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-import com.google.gson.JsonParser;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.onap.dcae.commonFunction.CommonStartup;
-import org.onap.dcae.commonFunction.CommonStartup.QueueFullException;
-import org.onap.dcae.commonFunction.CustomExceptionLoader;
-import org.onap.dcae.commonFunction.VESLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class EventReceipt extends NsaBaseEndpoint {
-
- private static final Logger log = LoggerFactory.getLogger(EventReceipt.class);
- private static final String MESSAGE = " Message:";
- static String valresult;
- static JSONObject customerror;
-
-
- public static void receiveVESEvent(DrumlinRequestContext ctx) {
- // the request body carries events. assume for now it's an array
- // of json objects that fits in memory. (See cambria's parsing for
- // handling large messages)
-
- NsaSimpleApiKey retkey = null;
-
- JSONArray jsonArray;
- JSONArray jsonArrayMod = new JSONArray();
- JSONObject event;
- JSONObject jsonObject;
- FileReader fr = null;
- InputStream istr = null;
- int arrayFlag = 0;
- String vesVersion = null;
-
- try {
- //System.out.print("Version string:" + version);
-
- // String br = new BufferedReader(new InputStreamReader(ctx.request().getBodyStream())).readLine();
- // JsonElement msg = new JsonParser().parse(new BufferedReader(new InputStreamReader(ctx.request().getBodyStream())).readLine());
- // jsonArray = new JSONArray ( new JSONTokener ( ctx.request().getBodyStream () ) );
-
- log.debug("Request recieved :" + ctx.request().getRemoteAddress());
- istr = ctx.request().getBodyStream();
- jsonObject = new JSONObject(new JSONTokener(istr));
-
- log.info("ctx getPathInContext: " + ctx.request().getPathInContext());
- Pattern p = Pattern.compile("(v\\d+)");
- Matcher m = p.matcher(ctx.request().getPathInContext());
-
- if (m.find()) {
- log.info("VES version:" + m.group());
- vesVersion = m.group();
- }
- if (ctx.request().getPathInContext().contains("eventBatch")) {
- CommonStartup.inlog.info(
- ctx.request().getRemoteAddress() + "VES Batch Input Messsage: " + jsonObject);
- log.info(
- ctx.request().getRemoteAddress() + "VES Batch Input Messsage: " + jsonObject);
- arrayFlag = 1;
- } else {
- CommonStartup.inlog
- .info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject);
- log.info(ctx.request().getRemoteAddress() + "Input Messsage: " + jsonObject);
-
- }
-
- UUID uuid = UUID.randomUUID();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
-
- try {
- if (CommonStartup.authflag == 1) {
- retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx);
- }
- } catch (NullPointerException x) {
- log.info(
- "Invalid user request " + ctx.request().getContentType() + MESSAGE
- + jsonObject);
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + x);
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Invalid user");
- return;
- }
-
- if (retkey != null || CommonStartup.authflag == 0) {
- if (CommonStartup.schema_Validatorflag > 0) {
-
- //fr = new FileReader(CommonStartup.schemaFile);
- fr = new FileReader(schemaFileVersion(vesVersion));
- String schema = new JsonParser().parse(fr).toString();
-
- valresult = CommonStartup.schemavalidate(jsonObject.toString(), schema);
- if ("true".equals(valresult)) {
- log.info("Validation successful");
- } else if ("false".equals(valresult)) {
- log.info("Validation failed");
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
- "Schema validation failed");
-
- return;
- } else {
- log.error("Validation errored" + valresult);
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
- "Couldn't parse JSON object");
- return;
-
- }
-
- if (arrayFlag == 1) {
- jsonArray = jsonObject.getJSONArray("eventList");
- log.info("Validation successful for all events in batch");
- for (int i = 0; i < jsonArray.length(); i++) {
- event = new JSONObject().put("event", jsonArray.getJSONObject(i));
- event.put("VESuniqueId", uuid + "-" + i);
- event.put("VESversion", vesVersion);
- jsonArrayMod.put(event);
- }
-
- log.info("Modified jsonarray:" + jsonArrayMod);
-
- } else {
-
- jsonObject.put("VESuniqueId", uuid);
- jsonObject.put("VESversion", vesVersion);
- jsonArrayMod = new JSONArray().put(jsonObject);
- }
-
- }
- // reject anything that's not JSON
- if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) {
- log.info(String.format("Rejecting request with content type %s Message:%s",
- ctx.request().getContentType(), jsonObject));
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
- "Incorrect message content-type; only accepts application/json messages");
- return;
- }
-
- CommonStartup.handleEvents(jsonArrayMod);
- } else {
- log.info(
- String.format("Unauthorized request %s%s%s", ctx.request().getContentType(),
- MESSAGE, jsonObject));
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized,
- "Unauthorized user");
- return;
- }
- } catch (JSONException | NullPointerException | IOException x) {
- log.error(String
- .format("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest%d%s%s",
- HttpStatusCodes.k400_badRequest, MESSAGE, x.getMessage()));
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x);
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
- "Couldn't parse JSON object");
- return;
- } catch (QueueFullException e) {
- log.error("Collector internal queue full :" + e.getMessage(), e);
- CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e);
- respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full");
- return;
- } finally {
- if (fr != null) {
- safeClose(fr);
- }
-
- if (istr != null) {
- safeClose(istr);
- }
- }
- log.info("MessageAccepted and k200_ok to be sent");
- ctx.response()
- .sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson);
- }
-
-
- public static void respondWithCustomMsginJson(DrumlinRequestContext ctx, int sc, String msg) {
- String[] str;
- String exceptionType = "GeneralException";
-
- str = CustomExceptionLoader.LookupMap(String.valueOf(sc), msg);
- log.info("Post CustomExceptionLoader.LookupMap" + str);
-
- if (str != null) {
-
- if (str[0].matches("SVC")) {
- exceptionType = "ServiceException";
- } else if (str[1].matches("POL")) {
- exceptionType = "PolicyException";
- }
-
- JSONObject jb = new JSONObject().put("requestError",
- new JSONObject().put(exceptionType,
- new JSONObject().put("MessagID", str[0]).put("text", str[1])));
-
- log.debug("Constructed json error : " + jb);
- ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson);
- } else {
- JSONObject jb = new JSONObject().put("requestError",
- new JSONObject()
- .put(exceptionType, new JSONObject().put("Status", sc).put("Error", msg)));
- ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson);
- }
-
- }
-
- public static void safeClose(FileReader fr) {
- if (fr != null) {
- try {
- fr.close();
- } catch (IOException e) {
- log.error("Error closing file reader stream : " + e);
- }
- }
-
- }
-
- public static void safeClose(InputStream is) {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- log.error("Error closing Input stream : " + e);
- }
- }
-
- }
-
- public static String schemaFileVersion(String version) {
- String filename = null;
-
- if (CommonStartup.schemaFileJson.has(version)) {
- filename = CommonStartup.schemaFileJson.getString(version);
- } else {
- filename = CommonStartup.schemaFile;
- }
- log.info(String.format("VESversion: %s Schema File:%s", version, filename));
- return filename;
-
- }
-
-}
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.restapi.endpoints;
+
+import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext;
+import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
+import com.att.nsa.drumlin.service.standards.MimeTypes;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+import com.google.gson.JsonParser;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Base64;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.onap.dcae.commonFunction.CommonStartup;
+import org.onap.dcae.commonFunction.CommonStartup.QueueFullException;
+import org.onap.dcae.commonFunction.VESLogger;
+import org.onap.dcae.restapi.ApiException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventReceipt extends NsaBaseEndpoint {
+
+ private static final Logger log = LoggerFactory.getLogger(EventReceipt.class);
+ private static final String MESSAGE = " Message:";
+
+ public static void receiveVESEvent(DrumlinRequestContext ctx) {
+ // the request body carries events. assume for now it's an array
+ // of json objects that fits in memory. (See cambria's parsing for
+ // handling large messages)
+
+ NsaSimpleApiKey retkey = null;
+
+
+ JSONObject jsonObject;
+ InputStream istr = null;
+ int arrayFlag = 0;
+ String vesVersion = null;
+ String userId=null;
+
+ try {
+
+
+ istr = ctx.request().getBodyStream();
+ jsonObject = new JSONObject(new JSONTokener(istr));
+
+ log.info("ctx getPathInContext: " + ctx.request().getPathInContext());
+ Pattern p = Pattern.compile("(v\\d+)");
+ Matcher m = p.matcher(ctx.request().getPathInContext());
+
+ if (m.find()) {
+ log.info("VES version:" + m.group());
+ vesVersion = m.group();
+ }
+
+ final UUID uuid = UUID.randomUUID();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+
+ if (ctx.request().getPathInContext().contains("eventBatch")) {
+ CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid
+ + " VES Batch Input Messsage: " + jsonObject);
+ log.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid + " VES Batch Input Messsage: "
+ + jsonObject);
+ arrayFlag = 1;
+ } else {
+ CommonStartup.inlog.info(
+ ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);
+ log.info(ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);
+
+ }
+
+ try {
+ if (CommonStartup.authflag) {
+ userId = getUser (ctx);
+ retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx);
+ }
+ } catch (NullPointerException x) {
+ //log.info("Invalid user request :" + userId + " FROM " + ctx.request().getRemoteAddress() + " " + ctx.request().getContentType() + MESSAGE + jsonObject);
+ log.info(String.format("Unauthorized request %s FROM %s %s %s %s", getUser(ctx), ctx.request().getRemoteAddress(), ctx.request().getContentType(), MESSAGE, jsonObject));
+ CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + userId + x);
+ respondWithCustomMsginJson(ctx, ApiException.UNAUTHORIZED_USER);
+ return;
+ }
+
+ if (schemaCheck(retkey, arrayFlag, jsonObject, vesVersion, ctx, uuid)) {
+ return;
+ }
+
+ } catch (JSONException | NullPointerException | IOException x) {
+ log.error(String.format("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest%d%s%s",
+ HttpStatusCodes.k400_badRequest, MESSAGE, x.getMessage()));
+ CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x);
+ respondWithCustomMsginJson(ctx, ApiException.INVALID_JSON_INPUT);
+ return;
+ } catch (QueueFullException e) {
+ log.error("Collector internal queue full :" + e.getMessage(), e);
+ CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e);
+ respondWithCustomMsginJson(ctx, ApiException.NO_SERVER_RESOURCES);
+ return;
+ } finally {
+ if (istr != null) {
+ safeClose(istr);
+ }
+ }
+ log.info("MessageAccepted and k200_ok to be sent");
+ ctx.response().sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson);
+ }
+
+
+ private static String getUser(DrumlinRequestContext ctx){
+ String authorization = ctx.request().getFirstHeader("Authorization");
+ if (authorization != null && authorization.startsWith("Basic")) {
+ String base64Credentials = authorization.substring("Basic".length()).trim();
+ String credentials = new String(Base64.getDecoder().decode(base64Credentials),
+ Charset.forName("UTF-8"));
+ final String[] values = credentials.split(":",2);
+ log.debug("User:" + values[0] + " Pwd:" + values[1]);
+ return values[0];
+ }
+ return null;
+
+ }
+
+ private static Boolean schemaCheck(NsaSimpleApiKey retkey, int arrayFlag,
+ JSONObject jsonObject, String vesVersion,
+ DrumlinRequestContext ctx, UUID uuid)
+ throws JSONException, QueueFullException, IOException {
+
+ JSONArray jsonArray;
+ JSONArray jsonArrayMod = new JSONArray();
+ JSONObject event;
+ FileReader fr;
+ if (retkey != null || !CommonStartup.authflag) {
+ if (CommonStartup.schemaValidatorflag) {
+ if ((arrayFlag == 1) && (jsonObject.has("eventList") && (!jsonObject.has("event")))
+ || ((arrayFlag == 0) && (!jsonObject.has("eventList") && (jsonObject.has("event"))))) {
+ fr = new FileReader(schemaFileVersion(vesVersion));
+ String schema = new JsonParser().parse(fr).toString();
+
+ String valresult = CommonStartup.validateAgainstSchema(jsonObject.toString(), schema);
+ switch (valresult) {
+ case "true":
+ log.info("Validation successful");
+ break;
+ case "false":
+ log.info("Validation failed");
+ respondWithCustomMsginJson(ctx, ApiException.SCHEMA_VALIDATION_FAILED);
+ return true;
+ default:
+ log.error("Validation errored" + valresult);
+ respondWithCustomMsginJson(ctx, ApiException.INVALID_JSON_INPUT);
+ return true;
+ }
+ } else {
+ log.info("Validation failed");
+ respondWithCustomMsginJson(ctx, ApiException.SCHEMA_VALIDATION_FAILED);
+ return true;
+ }
+ if (arrayFlag == 1) {
+ jsonArray = jsonObject.getJSONArray("eventList");
+ log.info("Validation successful for all events in batch");
+ for (int i = 0; i < jsonArray.length(); i++) {
+ event = new JSONObject().put("event", jsonArray.getJSONObject(i));
+ event.put("VESuniqueId", uuid + "-" + i);
+ event.put("VESversion", vesVersion);
+ jsonArrayMod.put(event);
+ }
+ log.info("Modified jsonarray:" + jsonArrayMod.toString());
+ } else {
+ jsonObject.put("VESuniqueId", uuid);
+ jsonObject.put("VESversion", vesVersion);
+ jsonArrayMod = new JSONArray().put(jsonObject);
+ }
+ }
+
+ // reject anything that's not JSON
+ if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) {
+ log.info(String.format("Rejecting request with content type %s Message:%s",
+ ctx.request().getContentType(), jsonObject));
+ respondWithCustomMsginJson(ctx, ApiException.INVALID_CONTENT_TYPE);
+ return true;
+ }
+
+ CommonStartup.handleEvents(jsonArrayMod);
+ } else {
+ log.info(String.format("Unauthorized request %s FROM %s %s %s %s", getUser(ctx), ctx.request().getRemoteAddress(), ctx.request().getContentType(), MESSAGE,
+ jsonObject));
+ respondWithCustomMsginJson(ctx, ApiException.UNAUTHORIZED_USER);
+ return true;
+ }
+ return false;
+ }
+
+ private static void respondWithCustomMsginJson(DrumlinRequestContext ctx, ApiException apiException) {
+ ctx.response()
+ .sendErrorAndBody(apiException.httpStatusCode,
+ apiException.toJSON().toString(), MimeTypes.kAppJson);
+ }
+
+ private static void safeClose(InputStream is) {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ log.error("Error closing Input stream : " + e);
+ }
+ }
+
+ }
+
+ public static String schemaFileVersion(String version) {
+ return CommonStartup.schemaFileJson.has(version) ?
+ CommonStartup.schemaFileJson.getString(version) : CommonStartup.schemaFileJson.getString("v5");
+ }
+
+}
+
diff --git a/src/main/resources/seclogger.yaml b/src/main/resources/seclogger.yaml
index b5dd177c..aeac4270 100644
--- a/src/main/resources/seclogger.yaml
+++ b/src/main/resources/seclogger.yaml
@@ -1,3 +1,20 @@
+# ================================================================================
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
package-name: org.onap.dcae.ecomplog
java-root: src/main/java
@@ -62,4 +79,4 @@ operations:
SECPublishOperation:
description: SEC Publish
SECAuthenticationOperation:
- description: SEC Authentication \ No newline at end of file
+ description: SEC Authentication
diff --git a/src/main/scripts/VESConfigPoller.sh b/src/main/scripts/VESConfigPoller.sh
new file mode 100644
index 00000000..75c2b585
--- /dev/null
+++ b/src/main/scripts/VESConfigPoller.sh
@@ -0,0 +1,125 @@
+#!/bin/sh -x
+###
+# ============LICENSE_START=======================================================
+# PROJECT
+# ================================================================================
+# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+# redirect stdout/stderr to a file
+#exec &> /opt/app/VESCollector/logs/console.txt
+
+usage() {
+ echo "VESConfigPoller.sh"
+}
+
+
+## Remove singel execution logic (loop 0)
+## On configupdate function, remove LoadDynamicConfig and invoke VESrestfulCollector stop/start
+
+BASEDIR=/opt/app/VESCollector/
+CONFIGFILENAME=/opt/app/KV-Configuration.json
+
+
+collector_configupdate() {
+
+ echo `date +"%Y%m%d.%H%M%S%3N"` - VESConfigPoller.sh:collector_configupdate
+ if [ -z "$CONSUL_HOST" ] || [ -z "$CONFIG_BINDING_SERVICE" ] || [ -z "$HOSTNAME" ]; then
+ echo "INFO: USING STANDARD CONTROLLER CONFIGURATION"
+ else
+ # move into base directory
+ cd $BASEDIR
+
+ CONFIG_FETCH=org.onap.dcae.controller.FetchDynamicConfig
+ $JAVA -cp "etc${PATHSEP}lib/*" $CONFIG_FETCH $*
+ if [ $? -ne 0 ]; then
+ echo "ERROR: Failed to fetch dynamic configuration from consul into container $CONFIGFILENAME"
+ else
+ echo "INFO: Dynamic config fetched successfully"
+ fi
+ sleep 10s
+ FLAG=0
+
+ if [ -f $CONFIGFILENAME ]; then
+ if [[ $(find $CONFIGFILENAME -mmin -$CBSPOLLTIMER -print) ]]; then
+ echo "File $CONFIGFILENAME is updated under $CBSPOLLTIMER minutes; Loader to be invoked"
+ FLAG=1
+ else
+ echo "File $CONFIGFILENAME NOT updated in last $CBSPOLLTIMER minutes; no configuration update!"
+ FLAG=0
+ fi
+
+ if [ $FLAG -eq 1 ]; then
+ echo "INFO: CONFIGFILE updated; triggering restart"
+ /opt/app/VESCollector/bin/VESrestfulCollector.sh stop
+ /opt/app/VESCollector/bin/VESrestfulCollector.sh start &
+ else
+ echo "INFO: CONFIGFILE load skipped"
+ fi
+ else
+ echo "ERROR: Configuration file $CONFIGFILENAME missing"
+ fi
+ fi
+}
+
+
+
+if [ -z "$CBSPOLLTIMER" ]; then
+ echo "CBSPOLLTIMER not set; set this to polling frequency in minutes"
+ exit 1
+fi
+
+
+## Pre-setting
+
+# use JAVA_HOME if provided
+if [ -z "$JAVA_HOME" ]; then
+ echo "ERROR: JAVA_HOME not setup"
+ echo "Startup Aborted!!"
+ exit 1
+ #JAVA=java
+else
+ JAVA=$JAVA_HOME/bin/java
+fi
+
+
+
+# determine a path separator that works for this platform
+PATHSEP=":"
+case "$(uname -s)" in
+
+ Darwin)
+ ;;
+
+ Linux)
+ ;;
+
+ CYGWIN*|MINGW32*|MSYS*)
+ PATHSEP=";"
+ ;;
+
+ *)
+ ;;
+esac
+
+
+
+##Run in loop the config pull and check
+while true
+do
+ sleep $(echo $CBSPOLLTIMER)m
+ collector_configupdate | tee -a ${BASEDIR}/logs/console.txt
+done
+
diff --git a/src/main/scripts/VESrestfulCollector.sh b/src/main/scripts/VESrestfulCollector.sh
index fc6cd22f..8462f4e2 100644
--- a/src/main/scripts/VESrestfulCollector.sh
+++ b/src/main/scripts/VESrestfulCollector.sh
@@ -1,173 +1,102 @@
-#!/bin/sh
-
-###
-# ============LICENSE_START=======================================================
-# PROJECT
-# ================================================================================
-# Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-###
-
-# redirect stdout/stderr to a file
-#exec &> /opt/app/VESCollector/logs/console.txt
-
-usage() {
- echo "VESrestfulCollector.sh <start/stop>"
-}
-
-
-#BASEDIR=/opt/app/d1gfp1m7/extra/VES/VESCollector-1.1.4-SNAPSHOT/
-BASEDIR=/opt/app/VESCollector/
-
-collector_start() {
- echo `date +"%Y%m%d.%H%M%S%3N"` - collector_start | tee -a ${BASEDIR}/logs/console.txt
- collectorPid=`pgrep -f org.onap.dcae.commonFunction`
-
- if [ ! -z "$collectorPid" ]; then
- echo "WARNING: VES Restful Collector already running as PID $collectorPid" | tee -a ${BASEDIR}/logs/console.txt
- echo "Startup Aborted!!!" | tee -a ${BASEDIR}/logs/console.txt
- exit 1
- fi
-
-
- # run java. The classpath is the etc dir for config files, and the lib dir
- # for all the jars.
- #cd /opt/app/VESCollector/
- cd ${BASEDIR}
- nohup $JAVA -cp "etc${PATHSEP}lib/*" $JAVA_OPTS -Dhttps.protocols=TLSv1.1,TLSv1.2 $MAINCLASS $* &
- if [ $? -ne 0 ]; then
- echo "VES Restful Collector has been started!!!" | tee -a ${BASEDIR}/logs/console.txt
- fi
-
-
-}
-
-collector_stop() {
- echo `date +"%Y%m%d.%H%M%S%3N"` - collector_stop
- collectorPid=`pgrep -f org.onap.dcae.commonFunction`
- if [ ! -z "$collectorPid" ]; then
- echo "Stopping PID $collectorPid"
-
- kill -9 $collectorPid
- sleep 5
- if [ ! "$(pgrep -f org.onap.dcae.commonFunction)" ]; then
- echo "VES Restful Collector has been stopped!!!"
- else
- echo "VES Restful Collector is being stopped!!!"
- fi
- else
- echo "WARNING: No VES Collector instance is currently running";
- exit 1
- fi
-
-
-}
-
-collector_configupdate() {
-
- echo `date +"%Y%m%d.%H%M%S%3N"` - collector_configupdate
- if [ -z "$CONSUL_HOST" ] || [ -z "$CONFIG_BINDING_SERVICE" ] || [ -z "$HOSTNAME" ]; then
- echo "INFO: USING STANDARD CONTROLLER CONFIGURATION"
- else
-
- echo "INFO: DYNAMIC CONFIG INTERFACE SUPPORTED"
- # move into base directory
-
- #BASEDIR=`dirname $0`
- #cd $BASEDIR/..
- cd /opt/app/VESCollector
-
- CONFIG_FETCH=org.onap.dcae.controller.FetchDynamicConfig
- $JAVA -cp "etc${PATHSEP}lib/*" $CONFIG_FETCH $*
- if [ $? -ne 0 ]; then
- echo "ERROR: Failed to fetch dynamic configuration from consul into container /opt/app/KV-Configuration.json"
- else
- echo "INFO: Dynamic config fetched and written successfully into container /opt/app/KV-Configuration.json"
- fi
-
-
- if [ -f /opt/app/KV-Configuration.json ]; then
-
- CONFIG_UPDATER=org.onap.dcae.controller.LoadDynamicConfig
- $JAVA -cp "etc${PATHSEP}lib/*" $CONFIG_UPDATER $*
- if [ $? -ne 0 ]; then
- echo "ERROR: Failed to update dynamic configuration into Application"
- else
- echo "INFO: Dynamic config updated successfully into VESCollector configuration!"
- fi
- else
- echo "ERROR: Configuration file /opt/app/KV-Configuration.json missing"
- fi
-
- fi
-}
-
-
-## Check usage
-if [ $# -ne 1 ]; then
- usage
- exit
-fi
-
-
-## Pre-setting
-
-# use JAVA_HOME if provided
-if [ -z "$JAVA_HOME" ]; then
- echo "ERROR: JAVA_HOME not setup"
- echo "Startup Aborted!!"
- exit 1
- #JAVA=java
-else
- JAVA=$JAVA_HOME/bin/java
-fi
-
-
-MAINCLASS=org.onap.dcae.commonFunction.CommonStartup
-
-# determine a path separator that works for this platform
-PATHSEP=":"
-case "$(uname -s)" in
-
- Darwin)
- ;;
-
- Linux)
- ;;
-
- CYGWIN*|MINGW32*|MSYS*)
- PATHSEP=";"
- ;;
-
- *)
- ;;
-esac
-
-
-
-
-case $1 in
- "start")
- collector_configupdate | tee -a ${BASEDIR}/logs/console.txt
- collector_start
- ;;
- "stop")
- collector_stop | tee -a ${BASEDIR}/logs/console.txt
- ;;
- *)
- usage
- ;;
-esac
-
+#!/bin/bash
+
+###
+# ============LICENSE_START=======================================================
+# PROJECT
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (C) 2018 Nokia Networks Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+source bin/logger.sh
+
+start() {
+ log "Starting application"
+ appPids=`pidof java`
+
+ if [ ! -z ${appPids} ]; then
+ logWarn "Tried to start an application, but it is already running on PID(s): ${appPids}. Startup aborted."
+ exit 1
+ fi
+
+ ${JAVA_HOME}/bin/java -cp "etc:lib/*" \
+ -Xms256m -Xmx512m \
+ -XX:ErrorFile=logs/java_error%p.log \
+ -XX:+HeapDumpOnOutOfMemoryError \
+ -Dhttps.protocols=TLSv1.1,TLSv1.2 \
+ org.onap.dcae.commonFunction.CommonStartup $* & &>> logs/collector.log
+}
+
+stop() {
+ log "Stopping application"
+ appPids=`pidof java`
+
+ if [ ! -z ${appPids} ]; then
+ echo "Killing java PID(s): ${appPids}"
+ kill -9 ${appPids}
+ sleep 5
+ if [ ! $(pidof java) ]; then
+ log "Application stopped"
+ else
+ logWarn "Application did not stop after 5 seconds"
+ fi
+ else
+ logWarn "Tried to stop an application, but it was not running";
+ fi
+}
+
+collector_configupdate() {
+ if [ -z ${CONSUL_HOST} ] || [ -z ${CONFIG_BINDING_SERVICE} ] || [ -z ${HOSTNAME} ]; then
+ log "Using standard controller configuration (no dynamic configuration done)"
+ else
+ ${JAVA_HOME}/bin/java -cp "etc:lib/*" org.onap.dcae.controller.FetchDynamicConfig $*
+
+ if [ $? -ne 0 ]; then
+ logWarn "Failed to fetch dynamic configuration from consul into container /opt/app/KV-Configuration.json"
+ else
+ log "Dynamic config fetched and written successfully into container /opt/app/KV-Configuration.json"
+ fi
+
+ if [ -f /opt/app/KV-Configuration.json ]; then
+ ${JAVA_HOME}/bin/java -cp "etc:lib/*" org.onap.dcae.controller.LoadDynamicConfig $*
+ if [ $? -ne 0 ]; then
+ echo "ERROR: Failed to update dynamic configuration into Application"
+ else
+ echo "INFO: Dynamic config updated successfully into VESCollector configuration!"
+ fi
+ paramName="collector.keystore.alias"
+ localpropertyfile="etc/collector.properties"
+ tmpfile="etc/collector.properties.tmp"
+ keystore=`grep collector.keystore.file.location $localpropertyfile | tr -d '[:space:]' | cut -d"=" -f2`
+ keypwdfile=`grep collector.keystore.passwordfile $localpropertyfile | tr -d '[:space:]' | cut -d"=" -f2`
+ echo "/usr/bin/keytool -list -keystore $keystore < $keypwdfile | grep "PrivateKeyEntry" | cut -d"," -f1"
+ tmpalias=`/usr/bin/keytool -list -keystore $keystore < $keypwdfile | grep "PrivateKeyEntry" | cut -d"," -f1`
+ alias=`echo $tmpalias | cut -d":" -f2`
+ sed "s~$paramName=.*~$paramName=$alias~g" $localpropertyfile > $tmpfile
+ echo `cat $tmpfile > $localpropertyfile`
+ rm $tmpfile
+ log "Keystore alias updated"
+ else
+ logWarn "Configuration file /opt/app/KV-Configuration.json missing"
+ fi
+ fi
+}
+
+case $1 in
+ "start") collector_configupdate; start ;;
+ "stop") stop ;;
+ *) echo "Bad usage. Should be: /bin/bash <this> start/stop"
+esac
+
diff --git a/src/main/scripts/VESrestfulCollector_Status.sh b/src/main/scripts/VESrestfulCollector_Status.sh
deleted file mode 100644
index 47be7a14..00000000
--- a/src/main/scripts/VESrestfulCollector_Status.sh
+++ /dev/null
@@ -1,41 +0,0 @@
-###
-# ============LICENSE_START=======================================================
-# PROJECT
-# ================================================================================
-# Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-###
-
-#!/bin/sh
-
-#secPid=`pgrep -f com.att.dcae.commonFunction.CommonStartup` --> master
-secPid=`pgrep -f org.onap.dcae.commonFunction.CommonStartup`
-
-
-if [ "${secPid}" ]
-then
- #errorcnt = `grep -c "CambriaSimplerBatchPublisher - Send failed" ../logs/collector.log`
- errorcnt=`tail -1000 ../logs/collector.log | grep -c "CambriaSimplerBatchPublisher - Send failed"`
-
- if [ $errorcnt -gt 10 ]
- then
- echo "VESCollecter_Is_HavingError to publish"
- else
- echo "VESCollecter_Is_Running as PID $secPid"
- fi
-else
- echo "VESCollecter_Is_Not_Running"
-fi
-exit
diff --git a/src/main/scripts/docker-entry.sh b/src/main/scripts/docker-entry.sh
index 34cbe4cb..0aad7584 100644
--- a/src/main/scripts/docker-entry.sh
+++ b/src/main/scripts/docker-entry.sh
@@ -1,41 +1,66 @@
-#!/bin/sh
-###
-# ============LICENSE_START=======================================================
-# PROJECT
-# ================================================================================
-# Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-###
-
-if [ ! -z "$COLLECTOR_IP" ]; then
- echo $COLLECTOR_IP $(hostname).dcae.simpledemo.onap.org >> /etc/hosts
-fi
-
-if [ ! -z "$DMAAPHOST" ]; then
- echo $DMAAPHOST onap.dmaap.org >> /etc/hosts
-else
- echo "DMAAPHOST ENV NOT SET!! PUBLISH WILL NOT BE SUPPORTED"
-fi
-
-if [ -z "$CONSUL_HOST" ] || [ -z "$CONFIG_BINDING_SERVICE" ] || [ -z "$HOSTNAME" ]; then
- echo "INFO: USING STANDARD ALONE CONFIGURATION SETUP"
-else
- echo "INFO: USING DCAEGEN2 CONTROLLER"
-fi
-
-/opt/app/VESCollector/bin/VESrestfulCollector.sh stop
-/opt/app/VESCollector/bin/VESrestfulCollector.sh start &
-
-while true; do sleep 1000; done
+#!/bin/bash
+###
+# ============LICENSE_START=======================================================
+# PROJECT
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (C) 2018 Nokia Networks Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+source bin/logger.sh
+
+# Redirect all stdout & stderr to a main log file, but also let it print into the console
+# At the time this script is invoked, these directories and files do not exist yet, so we need to create them
+mkdir -p logs
+touch logs/collector.log
+exec &> >(tee -a logs/collector.log)
+
+log "Enabling log rotation for collector.log"
+loggedCommand "cp etc/logrotate.conf /etc/logrotate.d"
+echo "* * * * * root logrotate /etc/logrotate.conf" >> /etc/crontab
+log "Restarting cron"
+loggedCommand "service cron reload"
+loggedCommand "service cron start"
+
+log "Main application entry-point invoked"
+
+if [ ! -z ${COLLECTOR_IP} ]; then
+ log "Collector ip (${COLLECTOR_IP}) (env var 'COLLECTOR_IP') found, adding entry to /etc/hosts"
+ echo ${COLLECTOR_IP} $(hostname).dcae.simpledemo.onap.org >> /etc/hosts
+fi
+
+if [ ! -z ${DMAAPHOST} ]; then
+ if [ -z "$(echo ${DMAAPHOST} | sed -e 's/[0-9\.]//g')" ]; then
+ log "DMaaP host (${DMAAPHOST}) (env var 'DMAAPHOST') found, adding entry to /etc/hosts"
+ echo "${DMAAPHOST} onap-dmaap" >> /etc/hosts
+ else
+ log "DMaaP host (${DMAAPHOST}) (env var 'DMAAPHOST') found, adding entry to /etc/host.aliases"
+ echo "onap-dmaap ${DMAAPHOST}" >> /etc/host.aliases
+ fi
+else
+ logWarn "DMaaP host (env var 'DMAAPHOST') is missing. Events will not be published to DMaaP"
+fi
+
+log "Scheduling application to be started, looping indefinitely to hold the docker process"
+bin/VESrestfulCollector.sh stop
+bin/VESrestfulCollector.sh start &
+
+# Add below if config polling should be enabled. More specific to K8 deployment in ONAP
+if [ ! -z ${CBSPOLLTIMER} ]; then
+ log "Configuration poll time (${CBSPOLLTIMER}) (env var 'CBSPOLLTIMER') found, enabling configuration polling from CBS"
+ bin/VESConfigPoller.sh &
+fi
+
+while true; do sleep 1000; done
diff --git a/src/main/scripts/logger.sh b/src/main/scripts/logger.sh
new file mode 100644
index 00000000..0c56aef0
--- /dev/null
+++ b/src/main/scripts/logger.sh
@@ -0,0 +1,58 @@
+#!/bin/bash
+###
+# ============LICENSE_START=======================================================
+# PROJECT
+# ================================================================================
+# Copyright (C) 2018 Nokia Networks Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+log() {
+ logMessage "INFO " "$1"
+}
+
+logWarn() {
+ logMessage "WARN " "$1"
+}
+
+logError() {
+ logMessage "ERROR" "$1"
+}
+
+# Mimics log4j formatter so log files are consistent
+logMessage() {
+ echo "[$(date -u +'%Y-%m-%d %H:%M:%S,%3N')][$1][$(printf "%-9s %s\n" "PID $$")][$0] - $2"
+}
+
+# Run command, catch all the stdout and stderr and based on whether it succeeded, take the output,
+# and log them using common formatter.
+# It is done, so that the log files could be consistent and not look like swiss cheese having
+# nicely formatted lines surrounded with raw command outputs
+# All log lines that are logged by those external comments are prepended with (ext process) so they
+# can be distinguished from hand-rolled messages
+loggedCommand() {
+ output=$($1 2>&1)
+ if [ ! -z "${output}" ]; then
+ if [ $? -eq 0 ]; then
+ while read -r line; do
+ log "(ext process) $line"
+ done <<< "$output"
+ else
+ while read -r line; do
+ logError "(ext process) $line"
+ done <<< "$output"
+ fi
+ fi
+} \ No newline at end of file
diff --git a/src/main/scripts/reconfigure.sh b/src/main/scripts/reconfigure.sh
index e8766d6f..b68ad000 100644
--- a/src/main/scripts/reconfigure.sh
+++ b/src/main/scripts/reconfigure.sh
@@ -5,6 +5,7 @@
# PROJECT
# ================================================================================
# Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (C) 2018 Nokia Networks Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,14 +20,13 @@
# limitations under the License.
# ============LICENSE_END=========================================================
###
+source bin/logger.sh
-
-if [ -z "$CONSUL_HOST" ] || [ -z "$CONFIG_BINDING_SERVICE" ] || [ -z "$HOSTNAME" ]; then
- echo "INFO: USING STANDARD CONTROLLER"
- /opt/app/manager/start-manager.sh
+if [ -z ${CONSUL_HOST} ] || [ -z ${CONFIG_BINDING_SERVICE} ] || [ -z ${HOSTNAME} ]; then
+ log "Using standard controller (start-manager.sh)"
+ /opt/app/manager/start-manager.sh
else
- echo "INFO: USING DCAEGEN2 CONTROLLER"
- /opt/app/VESCollector/bin/VESrestfulCollector.sh stop
- /opt/app/VESCollector/bin/VESrestfulCollector.sh start &
-fi
-#while true; do sleep 1000; done
+ log "Using DCAEGEN2 controller (VESrestfulCollector.sh)"
+ bin/VESrestfulCollector.sh stop
+ bin/VESrestfulCollector.sh start &
+fi \ No newline at end of file
diff --git a/src/main/scripts/run-dcae-controller-ves-collector-daemon.sh b/src/main/scripts/run-dcae-controller-ves-collector-daemon.sh
deleted file mode 100644
index 544bf3f6..00000000
--- a/src/main/scripts/run-dcae-controller-ves-collector-daemon.sh
+++ /dev/null
@@ -1,39 +0,0 @@
-#!/bin/bash
-
-###
-# ============LICENSE_START=======================================================
-# PROJECT
-# ================================================================================
-# Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-###
-
-
-IMAGE="dcae-controller-ves-collector"
-VER="latest"
-HOST=$(hostname)
-NAME="ves-collector-$HOST"
-HOST=$(hostname -f)
-
-HOST_VM_LOGDIR="/var/log/${HOST}-docker"
-
-CMD="/bin/bash"
-# remove the imate, interactive terminal, map exposed port
-set -x
-#docker run -d -p 8080:8080/tcp -p 8443:8443/tcp -P --name ${NAME} ${IMAGE}:${VER}
-#docker run -td --name ${NAME} ${IMAGE}:${VER} ${CMD}
-#docker run -td --name ${NAME} ${IMAGE}:${VER}
-docker run -td -p 8080:8080/tcp -p 8443:8443/tcp -P --name ${NAME} ${IMAGE}:${VER} ${CMD}
-
diff --git a/src/main/scripts/run-dcae-controller-ves-collector-interactive.sh b/src/main/scripts/run-dcae-controller-ves-collector-interactive.sh
deleted file mode 100644
index e5759b10..00000000
--- a/src/main/scripts/run-dcae-controller-ves-collector-interactive.sh
+++ /dev/null
@@ -1,39 +0,0 @@
-#!/bin/bash
-
-###
-# ============LICENSE_START=======================================================
-# PROJECT
-# ================================================================================
-# Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-###
-
-
-IMAGE="dcae-controller-ves-collector"
-VER="latest"
-HOST=$(hostname)
-NAME="ves-collector-$HOST"
-HOST=$(hostname -f)
-CMD="/bin/bash"
-
-HOST_VM_LOGDIR="/var/log/${HOST}-docker-${IMAGE}"
-
-# remove the imate, interactive terminal, map exposed port
-set -x
-docker run --rm -it -v ${HOST_VM_LOGDIR}/manager_ves-collector:/opt/app/manager/logs \
- -v ${HOST_VM_LOGDIR}/VEScollector:/opt/app/VEScollector/logs \
- -v /etc/dcae:/etc/dcae \
- --name ${NAME} ${IMAGE}:${VER} \
- ${CMD}