aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/AnyNode.java157
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java64
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/CommonStartup.java373
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java143
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java91
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/EventProcessor.java7
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java85
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java108
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java99
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java62
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java124
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java38
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java98
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java51
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java144
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java35
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java4
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java114
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java (renamed from src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java)47
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java126
-rw-r--r--src/test/java/org/onap/dcae/vestest/AnyNodeTest.java80
-rw-r--r--src/test/java/org/onap/dcae/vestest/DmaapPropertyReaderTest.java120
-rw-r--r--src/test/resources/testDmaapConfig_gen2.json24
-rw-r--r--src/test/resources/testFullDmaapConfig_channels.json13
-rw-r--r--src/test/resources/testParseDMaaPCredentialsGen2.json21
-rw-r--r--src/test/resources/testParseDMaaPCredentialsLegacy.json26
-rw-r--r--src/test/resources/testParseDMaaPGen2.json12
-rw-r--r--src/test/resources/testParseDMaaPLegacy.json21
28 files changed, 1183 insertions, 1104 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java
index 267c87a9..97d73ddd 100644
--- a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java
+++ b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java
@@ -19,164 +19,95 @@
*/
package org.onap.dcae.commonFunction;
+import static io.vavr.API.Set;
+
+import io.vavr.collection.List;
+import io.vavr.collection.Set;
+import io.vavr.control.Option;
+import java.util.stream.StreamSupport;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
/**
- * This class is a wrapper for 2 most used entities of org.json lib: JSONArray and JSONObject and
- * comprises utility methods for fast access of json structures without need to explicitly coerce between them.
- * While using this, bear in mind it does not contain exception handling - it is assumed that when using, the parsed json structure is known.
+ * This class is a wrapper for 2 most used entities of org.json lib: JSONArray and JSONObject and comprises utility
+ * methods for fast access of json structures without need to explicitly coerce between them. While using this, bear in
+ * mind it does not contain exception handling - it is assumed that when using, the parsed json structure is known.
*
* @author koblosz
*/
public class AnyNode {
- private final Object obj;
- private static final Logger log = LoggerFactory.getLogger(AnyNode.class);
- public static AnyNode parse(String filePath) throws IOException {
- try (FileReader fr = new FileReader(filePath)) {
- return new AnyNode(new JSONObject(new JSONTokener(fr)));
- } catch (FileNotFoundException | JSONException e1) {
- log.error("Could not find or parse file under path %s due to: %s", filePath, e1.toString());
- e1.printStackTrace();
- throw e1;
- }
- }
+ private Object obj;
- /**
- * Returns key set of underlying object. It is assumed that underlying object is of type org.json.JSONObject.
- *
- * @return Set of string keys present in underlying JSONObject
- */
- public Set<String> getKeys() {
- return asJsonObject().keySet();
+ private AnyNode(Object object) {
+ this.obj = object;
}
- /**
- * Returns value associated with specified key wrapped with AnyValue object. It is assumed that this is of type org.json.JSONObject.
- *
- * @param key A key string
- * @return The AnyNode object associated with given key.
- */
- public AnyNode get(String key) {
- return new AnyNode(asJsonObject().get(key));
+ public static AnyNode fromString(String content) {
+ return new AnyNode(new JSONObject(content));
}
/**
- * Returns value under specified index wrapped with AnyValue object. It is assumed that this is of type org.json.JSONArray.
- *
- * @param idx An index of JSONArray
- * @return The AnyNode object associated with given index.
+ * Returns key set of underlying object. It is assumed that underlying object is of type org.json.JSONObject.
*/
- public AnyNode get(int idx) {
- return new AnyNode(asJsonArray().get(idx));
+ public Set<String> keys() {
+ return Set(asJsonObject().keySet().toArray(new String[]{}));
}
/**
- * Returns int assuming this can be coerced to int.
+ * Returns value associated with specified key wrapped with AnyValue object. It is assumed that this is of type
+ * org.json.JSONObject.
*/
- public int asInt() {
- return (int) this.obj;
+ public AnyNode get(String key) {
+ return new AnyNode(asJsonObject().get(key));
}
/**
- * Returns string representation of this. If it happens to have null, the value is treated as org.json.JSONObject.NULL and "null" string is returned then.
- *
- * @return A String
+ * Returns string representation of this. If it happens to have null, the value is treated as
+ * org.json.JSONObject.NULL and "null" string is returned then.
*/
- public String asString() {
- return this.obj != JSONObject.NULL ? (String) this.obj : JSONObject.NULL.toString();
- }
-
public String toString() {
return this.obj.toString();
}
/**
- * Converts underlying object to String-to-Object map. It is assumed that underlying object is of type org.json.JSONObject.
- *
- * @return A map.
+ * Returns optional of object under specified key, wrapped with AnyNode object.
+ * If underlying object is not of type org.json.JSONObject
+ * or underlying object has no given key
+ * or given key is null
+ * then Optional.empty will be returned.
*/
- public Map<String, Object> asRawMap() {
- return asJsonObject().toMap();
- }
-
- /**
- * Returns optional of object under specified key, wrapped with AnyNode object. If underlying object is not of type org.json.JSONObject, then Optional.empty will be returned.
- *
- * @param key A key string
- */
- public Optional<AnyNode> getAsOptional(String key) {
- AnyNode result = null;
+ public Option<AnyNode> getAsOption(String key) {
try {
- result = get(key);
- } catch (JSONException ignored) {
+ AnyNode value = get(key);
+ if (value.toString().equals("null")) {
+ return Option.none();
+ }
+ return Option.some(value);
+ } catch (JSONException ex) {
+ return Option.none();
}
- return Optional.ofNullable(result);
- }
-
- private JSONObject asJsonObject() {
- return (JSONObject) this.obj;
- }
-
- /**
- * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that underlying object is of type org.json.JSONObject.
- */
- public Map<String, AnyNode> asMap() {
- Map<String, AnyNode> map = new HashMap<>();
- getKeys().forEach(key -> map.put(key, get(key)));
- return map;
- }
-
- /**
- * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that underlying object is of type org.json.JSONObject.
- */
- public java.util.List<AnyNode> asList() {
- return asStream().collect(Collectors.toList());
}
/**
- * Converts this object to stream of underlying objects wrapped with AnyNode class. It is assumed that this is of type JSONArray.
+ * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that
+ * underlying object is of type org.json.JSONObject.
*/
- private Stream<AnyNode> asStream() {
- return StreamSupport.stream(((JSONArray) this.obj).spliterator(), false).map(AnyNode::new);
+ public List<AnyNode> toList() {
+ return List.ofAll(StreamSupport.stream(((JSONArray) this.obj).spliterator(), false).map(AnyNode::new));
}
/**
* Checks if specified key is present in this. It is assumed that this is of type JSONObject.
*/
- boolean hasKey(String key) {
- return getAsOptional(key).isPresent();
- }
-
- /**
- * Returns empty AnyNode (with null inside)
- */
- public static AnyNode nullValue() {
- return new AnyNode(JSONObject.NULL.toString());
+ public boolean has(String key) {
+ return !getAsOption(key).isEmpty();
}
- private JSONArray asJsonArray() {
- return (JSONArray) this.obj;
+ private JSONObject asJsonObject() {
+ return (JSONObject) this.obj;
}
- private AnyNode(Object object) {
- this.obj = object;
- }
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java b/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java
deleted file mode 100644
index 79109c04..00000000
--- a/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.onap.dcae.commonFunction;
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class CambriaPublisherFactory {
-
- private final static Logger log = LoggerFactory.getLogger(CambriaPublisherFactory.class);
-
- CambriaBatchingPublisher createCambriaPublisher(String streamId)
- throws MalformedURLException, GeneralSecurityException {
- String authpwd = null;
- DmaapPropertyReader reader = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile);
- Map<String, String> dMaaPProperties = reader.getDmaapProperties();
- String ueburl = dMaaPProperties.get(streamId + ".cambria.url");
-
- if (ueburl == null) {
- ueburl = dMaaPProperties.get(streamId + ".cambria.hosts");
- }
- String topic = reader.getKeyValue(streamId + ".cambria.topic");
- String authuser = reader.getKeyValue(streamId + ".basicAuthUsername");
-
- if (authuser != null) {
- authpwd = dMaaPProperties.get(streamId + ".basicAuthPassword");
- }
-
- if ((authuser != null) && (authpwd != null)) {
- log.debug(String.format("URL:%sTOPIC:%sAuthUser:%sAuthpwd:%s", ueburl, topic, authuser, authpwd));
- return new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic).usingHttps()
- .authenticatedByHttp(authuser, authpwd).logSendFailuresAfter(5)
- .build();
- } else {
- log.debug(String.format("URL:%sTOPIC:%s", ueburl, topic));
- return new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic)
- .logSendFailuresAfter(5)
- .build();
- }
- }
-}
diff --git a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
index b60104ea..3469531e 100644
--- a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
+++ b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -39,15 +39,6 @@ import com.github.fge.jsonschema.core.report.ProcessingMessage;
import com.github.fge.jsonschema.core.report.ProcessingReport;
import com.github.fge.jsonschema.main.JsonSchema;
import com.github.fge.jsonschema.main.JsonSchemaFactory;
-
-import org.apache.catalina.LifecycleException;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.onap.dcae.restapi.RestfulCollectorServlet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
@@ -58,185 +49,193 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.catalina.LifecycleException;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onap.dcae.commonFunction.event.publishing.DMaaPConfigurationParser;
+import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
+import org.onap.dcae.restapi.RestfulCollectorServlet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class CommonStartup extends NsaBaseEndpoint implements Runnable {
- private static final String KCONFIG = "c";
- private static final String KSETTING_PORT = "collector.service.port";
- private static final int KDEFAULT_PORT = 8080;
- private static final String KSETTING_SECUREPORT = "collector.service.secure.port";
- private static final int KDEFAULT_SECUREPORT = -1;
- private static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile";
- private static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile";
- private static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location";
- private static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore";
- private static final String KSETTING_KEYALIAS = "collector.keystore.alias";
- private static final String KDEFAULT_KEYALIAS = "tomcat";
- private static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile";
- private static final String[] KDEFAULT_DMAAPCONFIGS = new String[] { "/etc/DmaapConfig.json" };
- private static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag";
- private static final int KDEFAULT_SCHEMAVALIDATOR = -1;
- private static final String KSETTING_SCHEMAFILE = "collector.schema.file";
- private static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}";
- private static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid";
- private static final String KSETTING_AUTHFLAG = "header.authflag";
- private static final int KDEFAULT_AUTHFLAG = 0;
- private static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag";
- private static final int KDEFAULT_EVENTTRANSFORMFLAG = 1;
- private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
- public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");
- static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
- public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");
-
- public static final String KSETTING_AUTHLIST = "header.authlist";
- static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4;
- public static int schemaValidatorflag = -1;
- public static int authflag = 1;
- static int eventTransformFlag = 1;
- public static JSONObject schemaFileJson;
- static String cambriaConfigFile;
- public static String streamID;
-
- static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
- private static ApiServer fTomcatServer = null;
- private static final Logger log = LoggerFactory.getLogger(CommonStartup.class);
-
- private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting {
- final List<ApiServerConnector> connectors = new LinkedList<>();
-
- if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) {
- connectors.add(new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)).secure(false)
- .build());
- }
-
- final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT);
- final String keystoreFile = settings.getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE);
- final String keystorePasswordFile = settings.getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE);
- final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS);
-
- if (securePort > 0) {
- String keystorePassword = readFile(keystorePasswordFile);
- connectors.add(new ApiServerConnector.Builder(securePort).secure(true)
- .keystorePassword(keystorePassword).keystoreFile(keystoreFile).keyAlias(keyAlias).build());
-
- }
-
- schemaValidatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR);
- if (schemaValidatorflag > 0) {
- String schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE);
- schemaFileJson = new JSONObject(schemaFile);
-
- }
- authflag = settings.getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG);
- String[] currentConfigFile = settings.getStrings(KSETTING_DMAAPCONFIGS, KDEFAULT_DMAAPCONFIGS);
- cambriaConfigFile = currentConfigFile[0];
- streamID = settings.getString(KSETTING_DMAAPSTREAMID, null);
- eventTransformFlag = settings.getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG);
-
- fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true)
- .name("collector").build();
- }
-
- public static void main(String[] args) {
- try {
- final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
- final String config = NsaCommandLineUtil.getSetting(argMap, KCONFIG, "collector.properties");
- final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class);
-
- final nvReadableStack settings = new nvReadableStack();
- settings.push(new nvPropertiesFile(settingStream));
- settings.push(new nvReadableTable(argMap));
-
- fProcessingInputQueue = new LinkedBlockingQueue<>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS);
-
- VESLogger.setUpEcompLogging();
-
- CommonStartup cs = new CommonStartup(settings);
-
- Thread commonStartupThread = new Thread(cs);
- commonStartupThread.start();
-
- EventProcessor ep = new EventProcessor();
- ExecutorService executor = Executors.newFixedThreadPool(20);
- for (int i = 0; i < 20; ++i) {
- executor.execute(ep);
- }
-
- } catch (loadException | missingReqdSetting | IOException e) {
- CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage());
- throw new RuntimeException(e);
- } catch (Exception e) {
- System.err.println("Uncaught exception - " + e.getMessage());
- CommonStartup.eplog.error("FATAL_ERROR" + e.getMessage());
- e.printStackTrace(System.err);
- }
- }
-
- public void run() {
- try {
- fTomcatServer.start();
- fTomcatServer.await();
- } catch (LifecycleException | IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static class QueueFullException extends Exception {
- private static final long serialVersionUID = 1L;
- }
-
- public static void handleEvents(JSONArray a) throws QueueFullException, JSONException {
- CommonStartup.metriclog.info("EVENT_PUBLISH_START");
- for (int i = 0; i < a.length(); i++) {
- if (!fProcessingInputQueue.offer(a.getJSONObject(i))) {
- throw new QueueFullException();
- }
- }
- log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
- CommonStartup.metriclog.info("EVENT_PUBLISH_END");
- }
-
- private static String readFile(String path) throws IOException {
- byte[] encoded = Files.readAllBytes(Paths.get(path));
- String pwd = new String(encoded);
- return pwd.substring(0, pwd.length() - 1);
- }
-
- public static String validateAgainstSchema(String jsonData, String jsonSchema) {
- ProcessingReport report;
- String result = "false";
-
- try {
- log.trace("Schema validation for event:" + jsonData);
- JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
- JsonNode data = JsonLoader.fromString(jsonData);
- JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
- JsonSchema schema = factory.getJsonSchema(schemaNode);
- report = schema.validate(data);
- } catch (JsonParseException e) {
- log.error("validateAgainstSchema:JsonParseException for event:" + jsonData);
- return e.getMessage();
- } catch (ProcessingException e) {
- log.error("validateAgainstSchema:Processing exception for event:" + jsonData);
- return e.getMessage();
- } catch (IOException e) {
- log.error(
- "validateAgainstSchema:IO exception; something went wrong trying to read json data for event:" + jsonData);
- return e.getMessage();
- }
- if (report != null) {
- for (ProcessingMessage pm : report) {
- log.trace("Processing Message: " + pm.getMessage());
- }
- result = String.valueOf(report.isSuccess());
- }
- try {
- log.debug("Validation Result:" + result + " Validation report:" + report);
- } catch (NullPointerException e) {
- log.error("validateAgainstSchema:NullpointerException on report");
- }
- return result;
- }
+ private static final String KCONFIG = "c";
+ private static final String KSETTING_PORT = "collector.service.port";
+ private static final int KDEFAULT_PORT = 8080;
+ private static final String KSETTING_SECUREPORT = "collector.service.secure.port";
+ private static final int KDEFAULT_SECUREPORT = -1;
+ private static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile";
+ private static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile";
+ private static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location";
+ private static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore";
+ private static final String KSETTING_KEYALIAS = "collector.keystore.alias";
+ private static final String KDEFAULT_KEYALIAS = "tomcat";
+ private static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile";
+ private static final String[] KDEFAULT_DMAAPCONFIGS = new String[]{"/etc/DmaapConfig.json"};
+ private static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag";
+ private static final int KDEFAULT_SCHEMAVALIDATOR = -1;
+ private static final String KSETTING_SCHEMAFILE = "collector.schema.file";
+ private static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}";
+ private static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid";
+ private static final String KSETTING_AUTHFLAG = "header.authflag";
+ private static final int KDEFAULT_AUTHFLAG = 0;
+ private static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag";
+ private static final int KDEFAULT_EVENTTRANSFORMFLAG = 1;
+ private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
+ public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");
+ static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
+ public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");
+
+ public static final String KSETTING_AUTHLIST = "header.authlist";
+ static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4;
+ public static int schemaValidatorflag = -1;
+ public static int authflag = 1;
+ static int eventTransformFlag = 1;
+ public static JSONObject schemaFileJson;
+ static String cambriaConfigFile;
+ public static String streamID;
+
+ static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
+ private static ApiServer fTomcatServer = null;
+ private static final Logger log = LoggerFactory.getLogger(CommonStartup.class);
+
+ private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting {
+ final List<ApiServerConnector> connectors = new LinkedList<>();
+
+ if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) {
+ connectors.add(new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)).secure(false)
+ .build());
+ }
+
+ final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT);
+ final String keystoreFile = settings.getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE);
+ final String keystorePasswordFile = settings.getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE);
+ final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS);
+
+ if (securePort > 0) {
+ String keystorePassword = readFile(keystorePasswordFile);
+ connectors.add(new ApiServerConnector.Builder(securePort).secure(true)
+ .keystorePassword(keystorePassword).keystoreFile(keystoreFile).keyAlias(keyAlias).build());
+
+ }
+
+ schemaValidatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR);
+ if (schemaValidatorflag > 0) {
+ String schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE);
+ schemaFileJson = new JSONObject(schemaFile);
+
+ }
+ authflag = settings.getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG);
+ String[] currentConfigFile = settings.getStrings(KSETTING_DMAAPCONFIGS, KDEFAULT_DMAAPCONFIGS);
+ cambriaConfigFile = currentConfigFile[0];
+ streamID = settings.getString(KSETTING_DMAAPSTREAMID, null);
+ eventTransformFlag = settings.getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG);
+
+ fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true)
+ .name("collector").build();
+ }
+
+ public static void main(String[] args) {
+ try {
+ final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
+ final String config = NsaCommandLineUtil.getSetting(argMap, KCONFIG, "collector.properties");
+ final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class);
+
+ final nvReadableStack settings = new nvReadableStack();
+ settings.push(new nvPropertiesFile(settingStream));
+ settings.push(new nvReadableTable(argMap));
+
+ fProcessingInputQueue = new LinkedBlockingQueue<>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS);
+
+ VESLogger.setUpEcompLogging();
+
+ CommonStartup cs = new CommonStartup(settings);
+
+ Thread commonStartupThread = new Thread(cs);
+ commonStartupThread.start();
+
+ EventProcessor ep = new EventProcessor(EventPublisher.createPublisher(oplog,
+ DMaaPConfigurationParser
+ .parseToDomainMapping(Paths.get(cambriaConfigFile))
+ .get()));
+ ExecutorService executor = Executors.newFixedThreadPool(20);
+ for (int i = 0; i < 20; ++i) {
+ executor.execute(ep);
+ }
+ } catch (Exception e) {
+ CommonStartup.eplog.error("Fatal error during application startup", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void run() {
+ try {
+ fTomcatServer.start();
+ fTomcatServer.await();
+ } catch (LifecycleException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static class QueueFullException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static void handleEvents(JSONArray a) throws QueueFullException, JSONException {
+ CommonStartup.metriclog.info("EVENT_PUBLISH_START");
+ for (int i = 0; i < a.length(); i++) {
+ if (!fProcessingInputQueue.offer(a.getJSONObject(i))) {
+ throw new QueueFullException();
+ }
+ }
+ log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
+ CommonStartup.metriclog.info("EVENT_PUBLISH_END");
+ }
+
+ private static String readFile(String path) throws IOException {
+ byte[] encoded = Files.readAllBytes(Paths.get(path));
+ String pwd = new String(encoded);
+ return pwd.substring(0, pwd.length() - 1);
+ }
+
+ public static String validateAgainstSchema(String jsonData, String jsonSchema) {
+ ProcessingReport report;
+ String result = "false";
+
+ try {
+ log.trace("Schema validation for event:" + jsonData);
+ JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
+ JsonNode data = JsonLoader.fromString(jsonData);
+ JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
+ JsonSchema schema = factory.getJsonSchema(schemaNode);
+ report = schema.validate(data);
+ } catch (JsonParseException e) {
+ log.error("validateAgainstSchema:JsonParseException for event:" + jsonData);
+ return e.getMessage();
+ } catch (ProcessingException e) {
+ log.error("validateAgainstSchema:Processing exception for event:" + jsonData);
+ return e.getMessage();
+ } catch (IOException e) {
+ log.error(
+ "validateAgainstSchema:IO exception; something went wrong trying to read json data for event:" + jsonData);
+ return e.getMessage();
+ }
+ if (report != null) {
+ for (ProcessingMessage pm : report) {
+ log.trace("Processing Message: " + pm.getMessage());
+ }
+ result = String.valueOf(report.isSuccess());
+ }
+ try {
+ log.debug("Validation Result:" + result + " Validation report:" + report);
+ } catch (NullPointerException e) {
+ log.error("validateAgainstSchema:NullpointerException on report");
+ }
+ return result;
+ }
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java
deleted file mode 100644
index 0ee1e434..00000000
--- a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.commonFunction;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-public class DmaapPropertyReader {
-
-
- private static final Logger log = LoggerFactory.getLogger(DmaapPropertyReader.class);
- private static final String CAMBRIA_TOPIC_KEY = "cambria.topic";
- private static final String CAMBRIA_HOSTS_KEY = "cambria.hosts";
- private static final String CAMBRIA_URL_KEY = "cambria.url";
- private static final List<String> LEGACY_CHANNEL_MANDATORY_PARAMS = Lists.newArrayList(CAMBRIA_TOPIC_KEY, CAMBRIA_HOSTS_KEY, CAMBRIA_URL_KEY, "basicAuthPassword", "basicAuthUsername");
- private static DmaapPropertyReader instance = null;
- private final Map<String, String> dmaapProperties;
-
- public DmaapPropertyReader(String cambriaConfigFilePath) {
- this.dmaapProperties = DmaapPropertyReader.getProcessedDmaapProperties(cambriaConfigFilePath);
- }
-
- public Map<String, String> getDmaapProperties() {
- return dmaapProperties;
- }
-
- public static synchronized DmaapPropertyReader getInstance(String channelConfig) {
- if (instance == null) {
- instance = new DmaapPropertyReader(channelConfig);
- }
- return instance;
- }
-
- public String getKeyValue(String hashKey) {
- return dmaapProperties.get(hashKey);
- }
-
- private static Map<String, String> getProcessedDmaapProperties(String configFilePath) {
- Map<String, String> transformedDmaapProperties = new HashMap<>();
- try {
- AnyNode root = AnyNode.parse(configFilePath);
- if (isInLegacyFormat(root)) {
- transformedDmaapProperties = getLegacyDmaapPropertiesWithChannels(root.get("channels"));
- } else {
- transformedDmaapProperties = getDmaapPropertiesWithInfoData(root);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- return transformedDmaapProperties;
- }
-
- private static boolean isInLegacyFormat(AnyNode root) {
- return root.hasKey("channels");
- }
-
- private static Map<String, String> getLegacyDmaapPropertiesWithChannels(AnyNode channelsNode) {
- return channelsNode.asList().stream()
- .map(DmaapPropertyReader::getTransformedMandatoryChannelProperties)
- .flatMap(m -> m.entrySet().stream())
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- }
-
- private static Map<String, String> getTransformedMandatoryChannelProperties(AnyNode channel) {
- String prefix = channel.get("name").asString() + ".";
- return channel.asMap().entrySet().stream().filter(el -> LEGACY_CHANNEL_MANDATORY_PARAMS.contains(el.getKey()) && !Objects.equals(el.getKey(), "name"))
- .collect(Collectors.toMap(k -> prefix + k.getKey(), v -> v.getValue().asString().replace("\"", "")));
- }
-
- private static Map<String, String> getDmaapPropertiesWithInfoData(AnyNode root) {
- return root.asMap().entrySet().stream()
- .map(DmaapPropertyReader::getTransformedMandatoryInfoProperties).flatMap(m -> m.entrySet().stream())
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- }
-
- private static Map<String, String> getTransformedMandatoryInfoProperties(Map.Entry<String, AnyNode> el) {
- String prefix = el.getKey() + ".";
- AnyNode val = el.getValue();
- Map<String, String> map = Maps.newHashMap();
- map.put(prefix + "basicAuthUsername", val.getAsOptional("aaf_username").orElse(AnyNode.nullValue()).asString().replace("\"", ""));
- map.put(prefix + "basicAuthPassword", val.getAsOptional("aaf_password").orElse(AnyNode.nullValue()).asString().replace("\"", ""));
- map.putAll(getParamsFromDmaapInfoTopicUrl(prefix, val.get("dmaap_info").get("topic_url").asString().replace("\"", "")));
- return map;
- }
-
- /***
- * Dmaap url structure pub - https://<dmaaphostname>:<port>/events/
- * <namespace>.<dmaapcluster>.<topic>, sub - https://<dmaaphostname>:
- * <port>/events/<namespace>.<dmaapcluster>.<topic>/G1/u1";
- *
- * Onap url structure pub - http://<dmaaphostname>:<port>/<unauthenticated>.
- * <topic>,
- */
- private static Map<String, String> getParamsFromDmaapInfoTopicUrl(String keyPrefix, String topicUrl) {
- Map<String, String> topicUrlParts = Maps.newHashMap();
- try {
- URL url = new URL(topicUrl);
- topicUrlParts.put(keyPrefix + CAMBRIA_URL_KEY, url.getAuthority());
- String[] pathParts = url.getPath().split("/");
- if (pathParts.length > 2 && "events".equals(pathParts[1])) {
- // DCAE internal dmaap topic convention
- topicUrlParts.put(keyPrefix + CAMBRIA_TOPIC_KEY, pathParts[2]);
- } else {
- // ONAP dmaap topic convention
- topicUrlParts.put(keyPrefix + CAMBRIA_TOPIC_KEY, pathParts[2]);
- topicUrlParts.put(keyPrefix + CAMBRIA_HOSTS_KEY, url.getHost());
- }
- } catch (MalformedURLException e) {
- log.error("Invalid URL found under topic_url key!", e);
- e.printStackTrace();
- }
- return topicUrlParts;
- }
-} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java
deleted file mode 100644
index a4c62719..00000000
--- a/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.commonFunction;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class DmaapPublishers {
-
- private static final Logger log = LoggerFactory.getLogger(DmaapPublishers.class);
- private final LoadingCache<String, CambriaBatchingPublisher> publishers;
-
- private DmaapPublishers(
- LoadingCache<String, CambriaBatchingPublisher> publishers) {
- this.publishers = publishers;
- }
-
- static DmaapPublishers create() {
- return create(new CambriaPublisherFactory());
- }
-
- static DmaapPublishers create(final CambriaPublisherFactory publisherFactory) {
- final LoadingCache<String, CambriaBatchingPublisher> cache = CacheBuilder.<String, CambriaBatchingPublisher>newBuilder()
- .removalListener((RemovalListener<String, CambriaBatchingPublisher>) notification -> {
- if (notification.getValue() != null) {
- onCacheItemInvalidated(notification.getValue());
- }
- })
- .build(new CacheLoader<String, CambriaBatchingPublisher>() {
- @Override
- public CambriaBatchingPublisher load(String streamId)
- throws MalformedURLException, GeneralSecurityException {
- try {
- return publisherFactory.createCambriaPublisher(streamId);
- } catch (MalformedURLException | GeneralSecurityException e) {
- log.error("CambriaClientBuilders connection reader exception : streamID - " + streamId + " "
- + e.getMessage());
- throw e;
- }
- }
- });
- return new DmaapPublishers(cache);
- }
-
- CambriaBatchingPublisher getByStreamId(String streamId) {
- return publishers.getUnchecked(streamId);
- }
-
- void closeByStreamId(String streamId) {
- publishers.invalidate(streamId);
- }
-
- private static void onCacheItemInvalidated(CambriaBatchingPublisher pub) {
- try {
- final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
- if (!stuck.isEmpty()) {
- log.error(stuck.size() + " messages unsent");
- }
- } catch (InterruptedException | IOException e) {
- log.error("Caught Exception on Close event: {}", e);
- }
- }
-}
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
index 06a27328..9d6ad360 100644
--- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
+++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
@@ -27,6 +27,7 @@ import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import java.util.Map;
import org.json.JSONObject;
+import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,8 +53,10 @@ class EventProcessor implements Runnable {
static Map<String, String[]> streamidHash = new HashMap<>();
public JSONObject event;
+ private EventPublisher eventPublisher;
- EventProcessor() {
+ public EventProcessor(EventPublisher eventPublisher) {
+ this.eventPublisher = eventPublisher;
streamidHash = parseStreamIdToStreamHashMapping(CommonStartup.streamID);
}
@@ -128,7 +131,7 @@ class EventProcessor implements Runnable {
for (String aStreamIdList : streamIdList) {
log.info("Invoking publisher for streamId:" + aStreamIdList);
this.overrideEvent();
- EventPublisherHash.getInstance().sendEvent(event, aStreamIdList);
+ eventPublisher.sendEvent(event, aStreamIdList);
}
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java b/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java
deleted file mode 100644
index f3907126..00000000
--- a/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.commonFunction;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventPublisherHash {
-
- private static final String VES_UNIQUE_ID = "VESuniqueId";
- private static final Logger log = LoggerFactory.getLogger(EventPublisherHash.class);
- private static volatile EventPublisherHash instance = new EventPublisherHash(DmaapPublishers.create());
- private final DmaapPublishers dmaapPublishers;
-
- public static EventPublisherHash getInstance() {
- return instance;
- }
-
- @VisibleForTesting
- EventPublisherHash(DmaapPublishers dmaapPublishers) {
- this.dmaapPublishers = dmaapPublishers;
- }
-
- void sendEvent(JSONObject event, String streamid) {
- log.debug("EventPublisher.sendEvent: instance for publish is ready");
- clearVesUniqueId(event);
-
- try {
- sendEventUsingCachedPublisher(streamid, event);
- } catch (IOException | IllegalArgumentException e) {
- log.error("Unable to publish event: {} streamID: {}. Exception: {}", event, streamid, e);
- dmaapPublishers.closeByStreamId(streamid);
- }
- }
-
- private void clearVesUniqueId(JSONObject event) {
- if (event.has(VES_UNIQUE_ID)) {
- String uuid = event.get(VES_UNIQUE_ID).toString();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
- log.debug("Removing VESuniqueid object from event");
- event.remove(VES_UNIQUE_ID);
- }
- }
-
- private void sendEventUsingCachedPublisher(String streamid, JSONObject event) throws IOException {
- int pendingMsgs = dmaapPublishers.getByStreamId(streamid).send("MyPartitionKey", event.toString());
- if (pendingMsgs > 100) {
- log.info("Pending Message Count=" + pendingMsgs);
- }
- log.info("pub.send invoked - no error");
- CommonStartup.oplog.info(String.format("StreamID:%s Event Published:%s ", streamid, event));
- }
-
- @VisibleForTesting
- CambriaBatchingPublisher getDmaapPublisher(String streamId) {
- return dmaapPublishers.getByStreamId(streamId);
- }
-}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java
new file mode 100644
index 00000000..bef14448
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java
@@ -0,0 +1,108 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import static io.vavr.API.List;
+import static io.vavr.API.Try;
+import static io.vavr.API.Tuple;
+import static io.vavr.API.unchecked;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
+
+import io.vavr.collection.List;
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import io.vavr.control.Try;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.onap.dcae.commonFunction.AnyNode;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+@SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do")
+public final class DMaaPConfigurationParser {
+
+ public static Try<Map<String, PublisherConfig>> parseToDomainMapping(Path configLocation) {
+ return readFromFile(configLocation)
+ .flatMap(DMaaPConfigurationParser::toJSON)
+ .flatMap(DMaaPConfigurationParser::toConfigMap);
+ }
+
+ private static Try<String> readFromFile(Path configLocation) {
+ return Try(() -> new String(Files.readAllBytes(configLocation)))
+ .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation)));
+ }
+
+ private static Try<AnyNode> toJSON(String config) {
+ return Try(() -> AnyNode.fromString(config))
+ .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config)));
+ }
+
+ private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) {
+ return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config))
+ .mapFailure(enhanceError(
+ f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
+ }
+
+ private static boolean usesLegacyFormat(AnyNode dMaaPConfig) {
+ return dMaaPConfig.has("channels");
+ }
+
+ private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) {
+ return root.get("channels").toList().toMap(
+ channel -> channel.get("name").toString(),
+ channel -> {
+ String destinationsStr = channel.getAsOption("cambria.url")
+ .getOrElse(channel.getAsOption("cambria.hosts").get())
+ .toString();
+ String topic = channel.get("cambria.topic").toString();
+ Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString);
+ Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString);
+ List<String> destinations = List(destinationsStr.split(","));
+ return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
+ });
+ }
+
+ private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) {
+ return root.keys().toMap(
+ channelName -> channelName,
+ channelName -> {
+ AnyNode channelConfig = root.get(channelName);
+ Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString);
+ Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString);
+ URL topicURL = unchecked(
+ () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply();
+ String[] pathSegments = topicURL.getPath().substring(1).split("/");
+ String topic = pathSegments[1];
+ String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost();
+ List<String> destinations = List(destination);
+ return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
+ });
+ }
+
+ private static PublisherConfig buildBasedOnAuth(Option<String> maybeUser, Option<String> maybePassword,
+ String topic, List<String> destinations) {
+ return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password)))
+ .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2))
+ .getOrElse(new PublisherConfig(destinations, topic));
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java
new file mode 100644
index 00000000..fd9b3ae1
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java
@@ -0,0 +1,99 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction.event.publishing;
+
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
+import io.vavr.collection.Map;
+import io.vavr.control.Try;
+import java.io.IOException;
+import org.json.JSONObject;
+import org.onap.dcae.commonFunction.VESLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+class DMaaPEventPublisher implements EventPublisher {
+ private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
+ private static final String VES_UNIQUE_ID = "VESuniqueId";
+ private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
+ private final DMaaPPublishersCache publishersCache;
+ private final Logger outputLogger;
+
+ DMaaPEventPublisher(DMaaPPublishersCache DMaaPPublishersCache,
+ Logger outputLogger) {
+ this.publishersCache = DMaaPPublishersCache;
+ this.outputLogger = outputLogger;
+ }
+
+ @Override
+ public void sendEvent(JSONObject event, String domain) {
+ clearVesUniqueIdFromEvent(event);
+ publishersCache.getPublisher(domain)
+ .onEmpty(() ->
+ log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event)))
+ .forEach(publisher -> sendEvent(event, domain, publisher));
+ }
+
+ @Override
+ public void reconfigure(Map<String, PublisherConfig> dMaaPConfig) {
+ publishersCache.reconfigure(dMaaPConfig);
+ }
+
+ private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) {
+ Try.run(() -> uncheckedSendEvent(event, domain, publisher))
+ .onFailure(exc -> closePublisher(event, domain, exc));
+ }
+
+ private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher)
+ throws IOException {
+ int pendingMsgs = publisher.send("MyPartitionKey", event.toString());
+ if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
+ log.info("Pending messages count: " + pendingMsgs);
+ }
+ String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain);
+ log.info(infoMsg);
+ outputLogger.info(infoMsg);
+ }
+
+ private void closePublisher(JSONObject event, String domain, Throwable e) {
+ log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
+ event, domain), e);
+ publishersCache.closePublisherFor(domain);
+ }
+
+ private void clearVesUniqueIdFromEvent(JSONObject event) {
+ if (event.has(VES_UNIQUE_ID)) {
+ String uuid = event.get(VES_UNIQUE_ID).toString();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+ log.debug("Removing VESuniqueid object from event");
+ event.remove(VES_UNIQUE_ID);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java
new file mode 100644
index 00000000..a7865a45
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java
@@ -0,0 +1,62 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import static io.vavr.API.Try;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
+import io.vavr.control.Try;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+final class DMaaPPublishersBuilder {
+
+ @SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do")
+ static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) {
+ return Try(() -> builder(config).build())
+ .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config)));
+ }
+
+ private static PublisherBuilder builder(PublisherConfig config) {
+ if (config.isSecured()) {
+ return authenticatedBuilder(config);
+ } else {
+ return unAuthenticatedBuilder(config);
+ }
+ }
+
+ private static PublisherBuilder authenticatedBuilder(PublisherConfig config) {
+ return unAuthenticatedBuilder(config)
+ .usingHttps()
+ .authenticatedByHttp(config.userName().get(), config.password().get());
+ }
+
+ private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) {
+ return new CambriaClientBuilders.PublisherBuilder()
+ .usingHosts(config.destinations().mkString(","))
+ .onTopic(config.topic())
+ .logSendFailuresAfter(5);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java
new file mode 100644
index 00000000..102d2774
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java
@@ -0,0 +1,124 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import static io.vavr.API.Option;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+class DMaaPPublishersCache {
+
+ private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class);
+ private final LoadingCache<String, CambriaBatchingPublisher> publishersCache;
+ private AtomicReference<Map<String, PublisherConfig>> dMaaPConfiguration;
+
+ DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) {
+ this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
+ this.publishersCache = CacheBuilder.newBuilder()
+ .removalListener(new OnPublisherRemovalListener())
+ .build(new CambriaPublishersCacheLoader());
+ }
+
+ DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader,
+ OnPublisherRemovalListener onPublisherRemovalListener,
+ Map<String, PublisherConfig> dMaaPConfiguration) {
+ this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
+ this.publishersCache = CacheBuilder.newBuilder()
+ .removalListener(onPublisherRemovalListener)
+ .build(dMaaPPublishersCacheLoader);
+ }
+
+ Option<CambriaBatchingPublisher> getPublisher(String streamID) {
+ try {
+ return Option(publishersCache.getUnchecked(streamID));
+ } catch (Exception e) {
+ log.warn("Could not create / load Cambria Publisher for streamID", e);
+ return Option.none();
+ }
+ }
+
+ void closePublisherFor(String streamId) {
+ publishersCache.invalidate(streamId);
+ }
+
+ synchronized void reconfigure(Map<String, PublisherConfig> newConfig) {
+ Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get();
+ Map<String, PublisherConfig> removedConfigurations = currentConfig
+ .filterKeys(domain -> !newConfig.containsKey(domain));
+ Map<String, PublisherConfig> changedConfigurations = newConfig
+ .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e)));
+ dMaaPConfiguration.set(newConfig);
+ removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1));
+ }
+
+ static class OnPublisherRemovalListener implements RemovalListener<String, CambriaBatchingPublisher> {
+
+ @Override
+ public void onRemoval(@Nonnull RemovalNotification<String, CambriaBatchingPublisher> notification) {
+ CambriaBatchingPublisher publisher = notification.getValue();
+ if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull
+ try {
+ int timeout = 20;
+ TimeUnit unit = TimeUnit.SECONDS;
+ java.util.List<?> stuck = publisher.close(timeout, unit);
+ if (!stuck.isEmpty()) {
+ log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', "
+ + "%s messages were dropped", stuck.size(), timeout, unit));
+ }
+ } catch (InterruptedException | IOException e) {
+ log.error("Could not close Cambria publisher, some messages might have been dropped", e);
+ }
+ }
+ }
+ }
+
+ class CambriaPublishersCacheLoader extends CacheLoader<String, CambriaBatchingPublisher> {
+
+ @Override
+ public CambriaBatchingPublisher load(@Nonnull String domain) {
+ return dMaaPConfiguration.get()
+ .get(domain)
+ .toTry(() -> new RuntimeException(
+ f("DMaaP configuration contains no configuration for domain: '%s'", domain)))
+ .flatMap(DMaaPPublishersBuilder::buildPublisher)
+ .get();
+ }
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java
new file mode 100644
index 00000000..9cd718f8
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java
@@ -0,0 +1,38 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import io.vavr.collection.Map;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+public interface EventPublisher {
+
+ static EventPublisher createPublisher(Logger outputLogger, Map<String, PublisherConfig> dMaaPConfig) {
+ return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger);
+ }
+
+ void sendEvent(JSONObject event, String domain);
+
+ void reconfigure(Map<String, PublisherConfig> dMaaPConfig);
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java
new file mode 100644
index 00000000..4a056778
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java
@@ -0,0 +1,98 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import java.util.Objects;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+public final class PublisherConfig {
+
+ private final List<String> destinations;
+ private final String topic;
+ private String userName;
+ private String password;
+
+ PublisherConfig(List<String> destinations, String topic) {
+ this.destinations = destinations;
+ this.topic = topic;
+ }
+
+ PublisherConfig(List<String> destinations, String topic, String userName, String password) {
+ this.destinations = destinations;
+ this.topic = topic;
+ this.userName = userName;
+ this.password = password;
+ }
+
+ List<String> destinations() {
+ return destinations;
+ }
+
+ String topic() {
+ return topic;
+ }
+
+ Option<String> userName() {
+ return Option.of(userName);
+ }
+
+ Option<String> password() {
+ return Option.of(password);
+ }
+
+ boolean isSecured() {
+ return userName().isDefined() && password().isDefined();
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PublisherConfig that = (PublisherConfig) o;
+ return Objects.equals(destinations, that.destinations) &&
+ Objects.equals(topic, that.topic) &&
+ Objects.equals(userName, that.userName) &&
+ Objects.equals(password, that.password);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(destinations, topic, userName, password);
+ }
+
+ @Override
+ public String toString() {
+ return "PublisherConfig{" +
+ "destinations=" + destinations +
+ ", topic='" + topic + '\'' +
+ ", userName='" + userName + '\'' +
+ ", password='" + password + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java
new file mode 100644
index 00000000..9bf3ef8c
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import static io.vavr.API.$;
+
+import io.vavr.API;
+import io.vavr.API.Match.Case;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+final class VavrUtils {
+
+ private VavrUtils() {
+ // utils aggregator
+ }
+
+ /**
+ * Shortcut for 'string interpolation'
+ */
+ static String f(String msg, Object... args) {
+ return String.format(msg, args);
+ }
+
+ /**
+ * Wrap failure with a more descriptive message of what has failed and chain original cause. Used to provide a
+ * context for errors instead of raw exception.
+ */
+ static Case<Throwable, Throwable> enhanceError(String msg) {
+ return API.Case($(), e -> new RuntimeException(msg, e));
+ }
+
+}
diff --git a/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java b/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java
deleted file mode 100644
index f4955ac8..00000000
--- a/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.commonFunction;
-
-import static org.hamcrest.CoreMatchers.allOf;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertSame;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaPublisher;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.concurrent.TimeUnit;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-
-@RunWith(MockitoJUnitRunner.class)
-public class DmaapPublishersTest {
-
- @Mock
- private CambriaPublisherFactory publisherFactory;
- @Mock
- private CambriaBatchingPublisher cambriaPublisher;
- private DmaapPublishers cut;
-
- @Rule
- public final ExpectedException expectedException = ExpectedException.none();
-
- @Before
- public void setUp() throws MalformedURLException, GeneralSecurityException {
- given(publisherFactory.createCambriaPublisher(anyString())).willReturn(cambriaPublisher);
- cut = DmaapPublishers.create(publisherFactory);
- }
-
- @Test
- public void getByStreamIdShouldUseCachedItem() throws IOException, GeneralSecurityException {
- // given
- String streamId = "sampleStream";
-
- // when
- CambriaBatchingPublisher firstPublisher = cut.getByStreamId(streamId);
- CambriaBatchingPublisher secondPublisher = cut.getByStreamId(streamId);
-
- // then
- verify(publisherFactory, times(1)).createCambriaPublisher(streamId);
- assertSame("should return same instance", firstPublisher, secondPublisher);
- }
-
- @Test
- public void getByStreamIdShouldHandleErrors() throws MalformedURLException, GeneralSecurityException {
- // given
- MalformedURLException exception = new MalformedURLException();
- given(publisherFactory.createCambriaPublisher(anyString())).willThrow(exception);
- expectedException.expect(allOf(
- instanceOf(UncheckedExecutionException.class),
- causeIsInstanceOf(exception.getClass())));
-
- // when
- cut.getByStreamId("a stream");
-
- // then
- // exception should have been thrown
- }
-
- @Test
- public void closeByStreamIdShouldCloseConnection() throws IOException, InterruptedException {
- // given
- String streamId = "sampleStream";
- given(cambriaPublisher.close(anyLong(), any(TimeUnit.class)))
- .willReturn(ImmutableList.of(new CambriaPublisher.message("p", "msg")));
-
- // when
- CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId);
- cut.closeByStreamId(streamId);
-
- // then
- assertSame("should return proper publisher", cambriaPublisher, cachedPublisher);
- verify(cambriaPublisher).close(20, TimeUnit.SECONDS);
- }
-
- @Test
- public void closeByStreamIdShouldHandleErrors() throws IOException, InterruptedException {
- // given
- String streamId = "sampleStream";
- given(cambriaPublisher.close(anyLong(), any(TimeUnit.class))).willThrow(IOException.class);
-
- // when
- CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId);
- cut.closeByStreamId(streamId);
-
- // then
- assertSame("should return proper publisher", cambriaPublisher, cachedPublisher);
- verify(cambriaPublisher).close(20, TimeUnit.SECONDS);
- }
-
- private Matcher<Exception> causeIsInstanceOf(final Class<?> clazz) {
- return new BaseMatcher<Exception>() {
- @Override
- public boolean matches(Object o) {
- return o instanceof Throwable && clazz.isInstance(((Throwable) o).getCause());
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("exception cause should be an instance of " + clazz.getName());
- }
- };
- }
-} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
index 973ee014..e211c12a 100644
--- a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
+++ b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
@@ -20,17 +20,17 @@
*/
package org.onap.dcae.commonFunction;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import com.google.gson.Gson;
+import java.util.concurrent.atomic.AtomicReference;
import org.json.JSONObject;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.util.List;
+import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -52,7 +52,7 @@ public class EventProcessorTest {
@Test
public void testLoad() {
//given
- EventProcessor ec = new EventProcessor();
+ EventProcessor ec = new EventProcessor(mock(EventPublisher.class));
ec.event = new org.json.JSONObject(ev);
//when
ec.overrideEvent();
@@ -65,7 +65,7 @@ public class EventProcessorTest {
@Test
public void shouldParseJsonEvents() throws ReflectiveOperationException {
//given
- EventProcessor eventProcessor = new EventProcessor();
+ EventProcessor eventProcessor = new EventProcessor(mock(EventPublisher.class));
String event_json = "[{ \"filter\": {\"event.commonEventHeader.domain\":\"heartbeat\",\"VESversion\":\"v4\"},\"processors\":[" +
"{\"functionName\": \"concatenateValue\",\"args\":{\"field\":\"event.commonEventHeader.eventName\",\"concatenate\": [\"$event.commonEventHeader.domain\",\"$event.commonEventHeader.eventType\",\"$event.faultFields.alarmCondition\"], \"delimiter\":\"_\"}}" +
",{\"functionName\": \"addAttribute\",\"args\":{\"field\": \"event.heartbeatFields.heartbeatFieldsVersion\",\"value\": \"1.0\",\"fieldType\": \"number\"}}" +
@@ -84,32 +84,5 @@ public class EventProcessorTest {
assertThat(stringArgumentCaptor.getAllValues()).contains("concatenateValue", "addAttribute", "map");
}
- @Test
- public void shouldCreateDmaapPublisher() {
-
- //given
- EventPublisherHash eph = EventPublisherHash.getInstance();
- EventProcessor ec = new EventProcessor();
- ec.event = new org.json.JSONObject(ev);
- CommonStartup.cambriaConfigFile = "src/test/resources/testDmaapConfig_ip.json";
-
- //when
- CambriaBatchingPublisher pub = eph.getDmaapPublisher("sec_fault_ueb");
-
- //then
- assertNotNull(pub);
- }
-
- @Test
- public void shouldSendEventWithNoError() {
-
- EventPublisherHash eph = EventPublisherHash.getInstance();
- EventProcessor eventProcessor = new EventProcessor();
- eventProcessor.event = new org.json.JSONObject(ev);
- CommonStartup.cambriaConfigFile = "src/test/resources/testDmaapConfig_ip.json";
-
- //when
- eph.sendEvent(eventProcessor.event, "sec_fault_ueb");
- }
}
diff --git a/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java b/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java
index e0fd5a42..12428024 100644
--- a/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java
+++ b/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java
@@ -22,6 +22,7 @@ package org.onap.dcae.commonFunction;
import static java.util.Base64.getDecoder;
import static java.util.Base64.getEncoder;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.att.nsa.cmdLine.NsaCommandLineUtil;
@@ -43,6 +44,7 @@ import org.json.JSONObject;
import org.junit.Test;
import org.mockito.Mockito;
import org.onap.dcae.commonFunction.CommonStartup.QueueFullException;
+import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
import org.onap.dcae.restapi.RestfulCollectorServlet;
@@ -79,7 +81,7 @@ public class TestCommonStartup {
public void testParseStreamIdToStreamHashMapping() {
// given
CommonStartup.streamID = "fault=sec_fault|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert|voiceQuality=ves_voicequality|sipSignaling=ves_sipsignaling";
- EventProcessor eventProcessor = new EventProcessor();
+ EventProcessor eventProcessor = new EventProcessor(mock(EventPublisher.class));
// when
Map<String, String[]> streamHashMapping = EventProcessor.streamidHash;
diff --git a/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java
new file mode 100644
index 00000000..5a94c662
--- /dev/null
+++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java
@@ -0,0 +1,114 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import static io.vavr.API.List;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcae.commonFunction.event.publishing.DMaaPConfigurationParser.parseToDomainMapping;
+
+import io.vavr.collection.Map;
+import io.vavr.control.Try;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.junit.Test;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+public class DMaaPConfigurationParserTest {
+
+ @Test
+ public void testParseCredentialsForGen2() {
+ Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsGen2.json");
+ Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path);
+
+ PublisherConfig authCredentialsNulls = publisherConfigs.get().get("auth-credentials-null").getOrNull();
+ assertThat(authCredentialsNulls.userName().isEmpty()).isTrue();
+ assertThat(authCredentialsNulls.password().isEmpty()).isTrue();
+ assertThat(authCredentialsNulls.isSecured()).isFalse();
+
+ PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull();
+ assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser");
+ assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword");
+ assertThat(authCredentialsPresent.isSecured()).isTrue();
+
+ PublisherConfig authCredentialsKeysMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull();
+ assertThat(authCredentialsKeysMissing.userName().isEmpty()).isTrue();
+ assertThat(authCredentialsKeysMissing.password().isEmpty()).isTrue();
+ assertThat(authCredentialsKeysMissing.isSecured()).isFalse();
+ }
+
+
+ @Test
+ public void testParseCredentialsForLegacy() {
+ Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsLegacy.json");
+ Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path);
+
+ PublisherConfig authCredentialsNull = publisherConfigs.get().get("auth-credentials-null").getOrNull();
+ assertThat(authCredentialsNull.userName().isEmpty()).isTrue();
+ assertThat(authCredentialsNull.password().isEmpty()).isTrue();
+ assertThat(authCredentialsNull.isSecured()).isFalse();
+
+ PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull();
+ assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser");
+ assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword");
+ assertThat(authCredentialsPresent.isSecured()).isTrue();
+
+ PublisherConfig authCredentialsMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull();
+ assertThat(authCredentialsMissing.userName().isEmpty()).isTrue();
+ assertThat(authCredentialsMissing.password().isEmpty()).isTrue();
+ assertThat(authCredentialsMissing.isSecured()).isFalse();
+ }
+
+
+ @Test
+ public void testParseGen2() {
+ Path path = Paths.get("src/test/resources/testParseDMaaPGen2.json");
+ Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path);
+
+ PublisherConfig withEventsSegment = publisherConfigs.get().get("event-segments-with-port").getOrNull();
+ assertThat(withEventsSegment.destinations()).isEqualTo(List("UEBHOST:3904"));
+ assertThat(withEventsSegment.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
+
+ PublisherConfig withOtherSegment = publisherConfigs.get().get("other-segments-without-ports").getOrNull();
+ assertThat(withOtherSegment.destinations()).isEqualTo(List("UEBHOST"));
+ assertThat(withOtherSegment.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
+ }
+
+ @Test
+ public void testParseLegacy() {
+ Path exemplaryConfig = Paths.get("src/test/resources/testParseDMaaPLegacy.json");
+ Try<Map<String, PublisherConfig>> publisherConfigs = DMaaPConfigurationParser
+ .parseToDomainMapping(exemplaryConfig);
+
+ PublisherConfig urlFirstThenHosts = publisherConfigs.get().get("url-precedes-hosts").getOrNull();
+ assertThat(urlFirstThenHosts.destinations()).isEqualTo(List("127.0.0.1:3904"));
+ assertThat(urlFirstThenHosts.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
+
+ PublisherConfig urlKeyMissing = publisherConfigs.get().get("url-key-missing").getOrNull();
+ assertThat(urlKeyMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com"));
+ assertThat(urlKeyMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
+
+ PublisherConfig urlIsMissing = publisherConfigs.get().get("url-is-null").getOrNull();
+ assertThat(urlIsMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com"));
+ assertThat(urlIsMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
+ }
+} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java
index 81c6556b..bbe5079e 100644
--- a/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java
+++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java
@@ -17,74 +17,73 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.commonFunction;
+package org.onap.dcae.commonFunction.event.publishing;
+import static io.vavr.API.Option;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import java.io.IOException;
import org.json.JSONObject;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
-@RunWith(MockitoJUnitRunner.class)
-public class EventPublisherHashTest {
- private EventPublisherHash cut;
+public class DMaaPEventPublisherTest {
- @Mock
- private DmaapPublishers dmaapPublishers;
- @Mock
+ private static final String STREAM_ID = "sampleStreamId";
+
+ private DMaaPEventPublisher eventPublisher;
private CambriaBatchingPublisher cambriaPublisher;
+ private DMaaPPublishersCache DMaaPPublishersCache;
@Before
public void setUp() {
- given(dmaapPublishers.getByStreamId(anyString())).willReturn(cambriaPublisher);
-
- cut = new EventPublisherHash(dmaapPublishers);
+ cambriaPublisher = mock(CambriaBatchingPublisher.class);
+ DMaaPPublishersCache = mock(DMaaPPublishersCache.class);
+ when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher));
+ eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache, mock(Logger.class));
}
@Test
- public void sendEventShouldSendEventToATopic() throws Exception {
+ public void shouldSendEventToTopic() throws Exception {
// given
JSONObject event = new JSONObject("{}");
- final String streamId = "sampleStreamId";
// when
- cut.sendEvent(event, streamId);
+ eventPublisher.sendEvent(event, STREAM_ID);
// then
verify(cambriaPublisher).send("MyPartitionKey", event.toString());
}
@Test
- public void sendEventShouldRemoveUuid() throws Exception {
+ public void shouldRemoveInternalVESUIDBeforeSending() throws Exception {
// given
- JSONObject event = new JSONObject("{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}");
- final String streamId = "sampleStreamId";
+ JSONObject event = new JSONObject(
+ "{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}");
// when
- cut.sendEvent(event, streamId);
+ eventPublisher.sendEvent(event, STREAM_ID);
// then
verify(cambriaPublisher).send("MyPartitionKey", new JSONObject("{\"another\": 8}").toString());
}
@Test
- public void sendEventShouldCloseConnectionWhenExceptionOccurred() throws Exception {
+ public void shouldCloseConnectionWhenExceptionOccurred() throws Exception {
// given
JSONObject event = new JSONObject("{}");
- final String streamId = "sampleStreamId";
given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail"));
// when
- cut.sendEvent(event, streamId);
+ eventPublisher.sendEvent(event, STREAM_ID);
// then
- verify(dmaapPublishers).closeByStreamId(streamId);
+ verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID);
}
} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java
new file mode 100644
index 00000000..8dc69f62
--- /dev/null
+++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java
@@ -0,0 +1,126 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import static io.vavr.API.List;
+import static io.vavr.API.Map;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.dcae.commonFunction.event.publishing.DMaaPPublishersCache.CambriaPublishersCacheLoader;
+import org.onap.dcae.commonFunction.event.publishing.DMaaPPublishersCache.OnPublisherRemovalListener;
+
+
+public class DMaaPPublishersCacheTest {
+
+ private String streamId1;
+ private Map<String, PublisherConfig> dMaaPConfigs;
+
+ @Before
+ public void setUp() {
+ streamId1 = "sampleStream1";
+ dMaaPConfigs = Map("sampleStream1", new PublisherConfig(List("destination1"), "topic1"));
+ }
+
+ @Test
+ public void shouldReturnTheSameCachedInstanceOnConsecutiveRetrievals() {
+ // given
+ DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
+
+ // when
+ Option<CambriaBatchingPublisher> firstPublisher = dMaaPPublishersCache.getPublisher(streamId1);
+ Option<CambriaBatchingPublisher> secondPublisher = dMaaPPublishersCache.getPublisher(streamId1);
+
+ // then
+ assertSame("should return same instance", firstPublisher.get(), secondPublisher.get());
+ }
+
+ @Test
+ public void shouldCloseCambriaPublisherOnCacheInvalidate() throws IOException, InterruptedException {
+ // given
+ CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
+ CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class);
+ DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
+ new OnPublisherRemovalListener(),
+ dMaaPConfigs);
+ when(cacheLoaderMock.load(streamId1)).thenReturn(cambriaPublisherMock1);
+
+ // when
+ dMaaPPublishersCache.getPublisher(streamId1);
+ dMaaPPublishersCache.closePublisherFor(streamId1);
+
+ // then
+ verify(cambriaPublisherMock1).close(20, TimeUnit.SECONDS);
+
+ }
+
+ @Test
+ public void shouldReturnNoneIfThereIsNoDMaaPConfigurationForGivenStreamID() {
+ // given
+ DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
+
+ // then
+ assertTrue("should not exist", dMaaPPublishersCache.getPublisher("non-existing").isEmpty());
+ }
+
+
+ @Test
+ public void shouldCloseOnlyChangedPublishers() throws IOException, InterruptedException {
+ // given
+ CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
+ CambriaBatchingPublisher cambriaPublisherMock2 = mock(CambriaBatchingPublisher.class);
+ CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class);
+ String firstDomain = "domain1";
+ String secondDomain = "domain2";
+ Map<String, PublisherConfig> oldConfig = Map(firstDomain,
+ new PublisherConfig(List("destination1"), "topic1"),
+ secondDomain,
+ new PublisherConfig(List("destination2"), "topic2",
+ "user", "pass"));
+ Map<String, PublisherConfig> newConfig = Map(firstDomain, new PublisherConfig(List("destination1"), "topic1"),
+ secondDomain, new PublisherConfig(List("destination2"), "topic2"));
+ DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
+ new OnPublisherRemovalListener(),
+ oldConfig);
+ when(cacheLoaderMock.load(firstDomain)).thenReturn(cambriaPublisherMock1);
+ when(cacheLoaderMock.load(secondDomain)).thenReturn(cambriaPublisherMock2);
+
+ dMaaPPublishersCache.getPublisher(firstDomain);
+ dMaaPPublishersCache.getPublisher(secondDomain);
+
+ // when
+ dMaaPPublishersCache.reconfigure(newConfig);
+
+ // then
+ verify(cambriaPublisherMock2).close(20, TimeUnit.SECONDS);
+ verifyZeroInteractions(cambriaPublisherMock1);
+ }
+} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/vestest/AnyNodeTest.java b/src/test/java/org/onap/dcae/vestest/AnyNodeTest.java
index 695f53c9..9400e46d 100644
--- a/src/test/java/org/onap/dcae/vestest/AnyNodeTest.java
+++ b/src/test/java/org/onap/dcae/vestest/AnyNodeTest.java
@@ -19,89 +19,45 @@
*/
package org.onap.dcae.vestest;
-import com.google.common.collect.ImmutableMap;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.common.collect.Sets;
-import org.json.JSONObject;
+import java.util.Set;
import org.junit.BeforeClass;
import org.junit.Test;
import org.onap.dcae.commonFunction.AnyNode;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
/**
* Created by koblosz on 07.06.18.
*/
public class AnyNodeTest {
- private static final String SAMPLE_JSON_FILEPATH = "src/test/resources/test_anynode_class.json";
- private static final Map<String, Object> EXPECTED_RAW_MAP = ImmutableMap.<String, Object>builder().put("a", 1).put("b", 2).build();
- private static final Set<String> EXPECTED_JSON_KEYS = Sets.newHashSet("channels", "sampleStrList", "sampleNestedObject", "sampleInt", "sampleString", "sampleNull");
+ private static final String SAMPLE_JSON_FILEPATH = "{\n"
+ + " \"channels\": [{\n"
+ + " \"one\": \"number1\", \"two\": \"number2\", \"three\": \"number3\"}],\n"
+ + " \"sampleStrList\": [\"1\", \"2\", \"3\", \"4\", \"5\"],\n"
+ + " \"sampleNestedObject\": {\"a\": 1, \"b\": 2},\n"
+ + " \"sampleInt\": 1,\n"
+ + " \"sampleString\": \"str\",\n"
+ + " \"sampleNull\": null\n"
+ + "}\n";
+ private static final Set<String> EXPECTED_JSON_KEYS = Sets
+ .newHashSet("channels", "sampleStrList", "sampleNestedObject", "sampleInt", "sampleString", "sampleNull");
private static AnyNode node;
@BeforeClass
- public static void setUpClass() throws IOException {
- node = AnyNode.parse(SAMPLE_JSON_FILEPATH);
- }
-
- @Test(expected = IOException.class)
- public void testShouldRethrowExceptionWhenFileNotFound() throws IOException {
- AnyNode.parse("not/existing/path");
+ public static void setUpClass() {
+ node = AnyNode.fromString(SAMPLE_JSON_FILEPATH);
}
@Test
public void testShouldReturnJsonObjectKeySet() {
- assertThat(node.getKeys()).containsOnlyElementsOf(EXPECTED_JSON_KEYS);
- }
-
- @Test
- public void testShouldGetElementAsString() {
- assertThat(node.get("sampleStrList").get(0).asString()).isEqualTo("1");
- }
-
- @Test
- public void testShouldGetElementAsInt() {
- assertThat(node.get("sampleInt").asInt()).isSameAs(1);
- }
-
- @Test
- public void testWhenNullValuePresentShouldReturnJsonObjectNullAsString() {
- assertThat(node.get("sampleNull").asString()).isSameAs(JSONObject.NULL.toString());
+ assertThat(node.keys()).containsOnlyElementsOf(EXPECTED_JSON_KEYS);
}
- @Test
- public void testShouldGetJsonObjectAsStringToObjectMap() {
- assertThat(node.get("sampleNestedObject").asRawMap()).containsAllEntriesOf(EXPECTED_RAW_MAP);
- }
-
- @Test
- public void testShouldGetAsMap() {
- assertThat(node.asMap().keySet()).containsOnlyElementsOf(EXPECTED_JSON_KEYS);
- }
-
- @Test
- public void testShouldGetAsList() {
- assertThat(node.get("sampleStrList").asList().stream().map(AnyNode::asString).collect(Collectors.toList())).containsExactly("1", "2", "3", "4", "5");
- }
-
- @Test
- public void testShouldGetAsOptional() {
- assertThat(node.getAsOptional("absentKey")).isNotPresent();
- }
-
- @Test
- public void testWhenChainMethodsShouldReturnValue() {
- assertThat(node.get("channels").get(0).get("two").asString()).isEqualTo("number2");
- }
-
-
@Test(expected = ClassCastException.class)
public void whenInvokedOnJsonObjInsteadOfJsonArrShouldRaiseRuntimeEx() {
- node.asList();
+ node.toList();
}
} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/vestest/DmaapPropertyReaderTest.java b/src/test/java/org/onap/dcae/vestest/DmaapPropertyReaderTest.java
deleted file mode 100644
index 46f5da4b..00000000
--- a/src/test/java/org/onap/dcae/vestest/DmaapPropertyReaderTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.vestest;
-
-import com.google.common.collect.ImmutableMap;
-import org.junit.Test;
-import org.onap.dcae.commonFunction.DmaapPropertyReader;
-
-import java.util.Map;
-
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-
-public class DmaapPropertyReaderTest {
-
-
- private static final String legacyConfigFilePath = "src/test/resources/testDmaapConfig_ip.json";
- private static final String dmaapInputConfigFilePath = "src/test/resources/testDmaapConfig_gen2.json";
- private static final String fullDmaapConfigWithChannels = "src/test/resources/testFullDmaapConfig_channels.json";
- private static final String fullGen2DmaapConfig = "src/test/resources/testFullDmaapConfig_gen2.json";
-
- private static final String FAULT_UEB_KEY_PREFIX = "sec_fault_ueb";
- private static final String VES_ALERT_SND_KEY_PREFIX = "ves-thresholdCrossingAlert-secondary";
- private static final String VES_FAULT_SECONDARY = "ves-fault-secondary";
-
- private static final String FAULT_BASIC_AUTH_USERNAME_KEY = VES_FAULT_SECONDARY + ".basicAuthUsername";
- private static final String ALERT_BASIC_AUTH_PWD_KEY = VES_ALERT_SND_KEY_PREFIX + ".basicAuthPassword";
-
- private static final String VES_ALERT_CAMBRIA_TOPIC_KEY = VES_ALERT_SND_KEY_PREFIX + ".cambria.topic";
- private static final String VES_ALERT_CAMBRIA_URL_KEY = VES_ALERT_SND_KEY_PREFIX + ".cambria.url";
- private static final String VES_FAULT_SND_CAMBRIA_URL_KEY = VES_FAULT_SECONDARY + ".cambria.url";
- private static final String VES_FAULT_SND_AUTH_PWD_KEY = VES_FAULT_SECONDARY + ".basicAuthPassword";
- private static final String VES_FAULT_SND_CAMBRIA_TOPIC_KEY = VES_FAULT_SECONDARY + ".cambria.topic";
- private static final String FAULT_UEB_CAMBRIA_HOSTS_KEY = FAULT_UEB_KEY_PREFIX + ".cambria.hosts";
- private static final String FAULT_UEB_CAMBRIA_TOPIC_KEY = FAULT_UEB_KEY_PREFIX + ".cambria.topic";
- private static final String VES_ALERT_SND_AUTH_USERNAME_KEY = VES_ALERT_SND_KEY_PREFIX + ".basicAuthUsername";
-
- private static final String NULL_TOSTRING = "null";
-
- private static final Map<String, String> expectedCompleteGen2DmaapConfig = ImmutableMap.<String, String>builder()
- .put(ALERT_BASIC_AUTH_PWD_KEY, "SamplePassWD2")
- .put(VES_ALERT_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV")
- .put(FAULT_BASIC_AUTH_USERNAME_KEY, "sampleUsername")
- .put(VES_ALERT_CAMBRIA_URL_KEY, "UEBHOST:3904")
- .put(VES_FAULT_SND_CAMBRIA_URL_KEY, "UEBHOST:3904")
- .put(VES_FAULT_SND_AUTH_PWD_KEY, "SamplePasswd")
- .put(VES_FAULT_SND_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV")
- .put(VES_ALERT_SND_AUTH_USERNAME_KEY, "sampleUsername2")
- .build();
-
- private static final Map<String, String> expectedIncompleteGen2DmaapConfig = ImmutableMap.<String, String>builder()
- .put(VES_ALERT_SND_AUTH_USERNAME_KEY, NULL_TOSTRING)
- .put(FAULT_BASIC_AUTH_USERNAME_KEY, NULL_TOSTRING)
- .put(VES_ALERT_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV")
- .put(VES_ALERT_CAMBRIA_URL_KEY, "UEBHOST:3904")
- .put(VES_FAULT_SND_CAMBRIA_URL_KEY, "UEBHOST:3904")
- .put(ALERT_BASIC_AUTH_PWD_KEY, NULL_TOSTRING)
- .put(VES_FAULT_SND_AUTH_PWD_KEY, NULL_TOSTRING)
- .put(VES_FAULT_SND_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV")
- .build();
-
- private static final Map<String, String> expectedCompleteChannelsDmaapConfig = ImmutableMap.<String, String>builder()
- .put(FAULT_UEB_CAMBRIA_HOSTS_KEY, "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com")
- .put(FAULT_UEB_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV")
- .put(FAULT_UEB_KEY_PREFIX + ".basicAuthPassword", "S0mEPassWD")
- .put(FAULT_UEB_KEY_PREFIX + ".basicAuthUsername", "sampleUser")
- .put(FAULT_UEB_KEY_PREFIX + ".cambria.url", "127.0.0.1:3904")
- .build();
-
- private static final Map<String, String> expectedIncompleteChannelsDmaapConfig = ImmutableMap.<String, String>builder()
- .put(FAULT_UEB_CAMBRIA_HOSTS_KEY, "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com")
- .put(FAULT_UEB_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV")
- .build();
-
- @Test
- public void testShouldCreateReaderWithAbsentParamsOmittedBasedOnChannelDmaapConfig() {
- assertReaderPreservedAllEntriesAfterTransformation(legacyConfigFilePath, expectedIncompleteChannelsDmaapConfig);
- }
-
- @Test
- public void testShouldCreateReaderWithAbsentParamsOmittedBasedOnGen2DmaapConfig() {
- assertReaderPreservedAllEntriesAfterTransformation(dmaapInputConfigFilePath, expectedIncompleteGen2DmaapConfig);
- }
-
- @Test
- public void shouldCreateReaderWithCompleteChannelDmaapConfig() {
- assertReaderPreservedAllEntriesAfterTransformation(fullDmaapConfigWithChannels, expectedCompleteChannelsDmaapConfig);
- }
-
- @Test
- public void shouldCreateReaderWithCompleteGen2DmaapConfig() {
- assertReaderPreservedAllEntriesAfterTransformation(fullGen2DmaapConfig, expectedCompleteGen2DmaapConfig);
- }
-
- private void assertReaderPreservedAllEntriesAfterTransformation(String dmaapConfigFilePath, Map<String, String> expectedMap) {
- DmaapPropertyReader reader = new DmaapPropertyReader(dmaapConfigFilePath);
-
- assertThat(reader.getDmaapProperties()).containsAllEntriesOf(expectedMap);
- assertThat(expectedMap).containsAllEntriesOf(reader.getDmaapProperties());
- }
-
-}
-
diff --git a/src/test/resources/testDmaapConfig_gen2.json b/src/test/resources/testDmaapConfig_gen2.json
deleted file mode 100644
index d8097bf1..00000000
--- a/src/test/resources/testDmaapConfig_gen2.json
+++ /dev/null
@@ -1,24 +0,0 @@
-{
- "ves-fault-secondary": {
- "aaf_username": null,
- "dmaap_info": {
- "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV",
- "location": "mtl5",
- "client_id": null,
- "client_role": null
- },
- "type": "message_router",
- "aaf_password": null
- },
- "ves-thresholdCrossingAlert-secondary": {
- "aaf_username": null,
- "dmaap_info": {
- "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV",
- "location": "mtl5",
- "client_id": null,
- "client_role": null
- },
- "type": "message_router",
- "aaf_password": null
- }
- } \ No newline at end of file
diff --git a/src/test/resources/testFullDmaapConfig_channels.json b/src/test/resources/testFullDmaapConfig_channels.json
deleted file mode 100644
index e2bc0339..00000000
--- a/src/test/resources/testFullDmaapConfig_channels.json
+++ /dev/null
@@ -1,13 +0,0 @@
-{
- "channels": [
- {
- "name": "sec_fault_ueb",
- "cambria.url": "127.0.0.1:3904",
- "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com",
- "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
- "basicAuthPassword": "S0mEPassWD",
- "basicAuthUsername": "sampleUser",
- "stripHpId": "true"
- }
- ]
-} \ No newline at end of file
diff --git a/src/test/resources/testParseDMaaPCredentialsGen2.json b/src/test/resources/testParseDMaaPCredentialsGen2.json
new file mode 100644
index 00000000..953cb6e8
--- /dev/null
+++ b/src/test/resources/testParseDMaaPCredentialsGen2.json
@@ -0,0 +1,21 @@
+{
+ "auth-credentials-null": {
+ "aaf_username": null,
+ "dmaap_info": {
+ "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV",
+ },
+ "aaf_password": null
+ },
+ "auth-credentials-present": {
+ "aaf_username": "sampleUser",
+ "dmaap_info": {
+ "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV",
+ },
+ "aaf_password": "samplePassword"
+ },
+ "auth-credentials-missing": {
+ "dmaap_info": {
+ "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV",
+ }
+ }
+} \ No newline at end of file
diff --git a/src/test/resources/testParseDMaaPCredentialsLegacy.json b/src/test/resources/testParseDMaaPCredentialsLegacy.json
new file mode 100644
index 00000000..ca59c7e7
--- /dev/null
+++ b/src/test/resources/testParseDMaaPCredentialsLegacy.json
@@ -0,0 +1,26 @@
+{
+ "channels": [
+ {
+ "name": "auth-credentials-null",
+ "cambria.url": "127.0.0.1:3904",
+ "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com",
+ "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
+ "basicAuthPassword": null,
+ "basicAuthUsername": null,
+ },
+ {
+ "name": "auth-credentials-present",
+ "cambria.url": "127.0.0.1:3904",
+ "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com",
+ "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
+ "basicAuthPassword": "samplePassword",
+ "basicAuthUsername": "sampleUser",
+ },
+ {
+ "name": "auth-credentials-missing",
+ "cambria.url": "127.0.0.1:3904",
+ "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com",
+ "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
+ }
+ ]
+} \ No newline at end of file
diff --git a/src/test/resources/testParseDMaaPGen2.json b/src/test/resources/testParseDMaaPGen2.json
new file mode 100644
index 00000000..5b4fe6a6
--- /dev/null
+++ b/src/test/resources/testParseDMaaPGen2.json
@@ -0,0 +1,12 @@
+{
+ "event-segments-with-port": {
+ "dmaap_info": {
+ "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV",
+ }
+ },
+ "other-segments-without-ports": {
+ "dmaap_info": {
+ "topic_url": "http://UEBHOST:3904/somethingHere/DCAE-SE-COLLECTOR-EVENTS-DEV",
+ }
+ }
+} \ No newline at end of file
diff --git a/src/test/resources/testParseDMaaPLegacy.json b/src/test/resources/testParseDMaaPLegacy.json
new file mode 100644
index 00000000..9661e30c
--- /dev/null
+++ b/src/test/resources/testParseDMaaPLegacy.json
@@ -0,0 +1,21 @@
+{
+ "channels": [
+ {
+ "name": "url-precedes-hosts",
+ "cambria.url": "127.0.0.1:3904",
+ "cambria.hosts": "h1.att.com,h2.att.com",
+ "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
+ },
+ {
+ "name": "url-key-missing",
+ "cambria.hosts": "h1.att.com,h2.att.com",
+ "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
+ },
+ {
+ "name": "url-is-null",
+ "cambria.url": null,
+ "cambria.hosts": "h1.att.com,h2.att.com",
+ "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV"
+ }
+ ]
+} \ No newline at end of file