diff options
29 files changed, 1189 insertions, 1104 deletions
@@ -386,6 +386,12 @@ limitations under the License. <artifactId>mail</artifactId> <version>1.4.7</version> </dependency> + <dependency> + <groupId>io.vavr</groupId> + <artifactId>vavr</artifactId> + <version>0.9.2</version> + </dependency> + <!-- TESTING --> <dependency> diff --git a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java index 267c87a9..97d73ddd 100644 --- a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java +++ b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java @@ -19,164 +19,95 @@ */ package org.onap.dcae.commonFunction; +import static io.vavr.API.Set; + +import io.vavr.collection.List; +import io.vavr.collection.Set; +import io.vavr.control.Option; +import java.util.stream.StreamSupport; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; -import org.json.JSONTokener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; /** - * This class is a wrapper for 2 most used entities of org.json lib: JSONArray and JSONObject and - * comprises utility methods for fast access of json structures without need to explicitly coerce between them. - * While using this, bear in mind it does not contain exception handling - it is assumed that when using, the parsed json structure is known. + * This class is a wrapper for 2 most used entities of org.json lib: JSONArray and JSONObject and comprises utility + * methods for fast access of json structures without need to explicitly coerce between them. While using this, bear in + * mind it does not contain exception handling - it is assumed that when using, the parsed json structure is known. * * @author koblosz */ public class AnyNode { - private final Object obj; - private static final Logger log = LoggerFactory.getLogger(AnyNode.class); - public static AnyNode parse(String filePath) throws IOException { - try (FileReader fr = new FileReader(filePath)) { - return new AnyNode(new JSONObject(new JSONTokener(fr))); - } catch (FileNotFoundException | JSONException e1) { - log.error("Could not find or parse file under path %s due to: %s", filePath, e1.toString()); - e1.printStackTrace(); - throw e1; - } - } + private Object obj; - /** - * Returns key set of underlying object. It is assumed that underlying object is of type org.json.JSONObject. - * - * @return Set of string keys present in underlying JSONObject - */ - public Set<String> getKeys() { - return asJsonObject().keySet(); + private AnyNode(Object object) { + this.obj = object; } - /** - * Returns value associated with specified key wrapped with AnyValue object. It is assumed that this is of type org.json.JSONObject. - * - * @param key A key string - * @return The AnyNode object associated with given key. - */ - public AnyNode get(String key) { - return new AnyNode(asJsonObject().get(key)); + public static AnyNode fromString(String content) { + return new AnyNode(new JSONObject(content)); } /** - * Returns value under specified index wrapped with AnyValue object. It is assumed that this is of type org.json.JSONArray. - * - * @param idx An index of JSONArray - * @return The AnyNode object associated with given index. + * Returns key set of underlying object. It is assumed that underlying object is of type org.json.JSONObject. */ - public AnyNode get(int idx) { - return new AnyNode(asJsonArray().get(idx)); + public Set<String> keys() { + return Set(asJsonObject().keySet().toArray(new String[]{})); } /** - * Returns int assuming this can be coerced to int. + * Returns value associated with specified key wrapped with AnyValue object. It is assumed that this is of type + * org.json.JSONObject. */ - public int asInt() { - return (int) this.obj; + public AnyNode get(String key) { + return new AnyNode(asJsonObject().get(key)); } /** - * Returns string representation of this. If it happens to have null, the value is treated as org.json.JSONObject.NULL and "null" string is returned then. - * - * @return A String + * Returns string representation of this. If it happens to have null, the value is treated as + * org.json.JSONObject.NULL and "null" string is returned then. */ - public String asString() { - return this.obj != JSONObject.NULL ? (String) this.obj : JSONObject.NULL.toString(); - } - public String toString() { return this.obj.toString(); } /** - * Converts underlying object to String-to-Object map. It is assumed that underlying object is of type org.json.JSONObject. - * - * @return A map. + * Returns optional of object under specified key, wrapped with AnyNode object. + * If underlying object is not of type org.json.JSONObject + * or underlying object has no given key + * or given key is null + * then Optional.empty will be returned. */ - public Map<String, Object> asRawMap() { - return asJsonObject().toMap(); - } - - /** - * Returns optional of object under specified key, wrapped with AnyNode object. If underlying object is not of type org.json.JSONObject, then Optional.empty will be returned. - * - * @param key A key string - */ - public Optional<AnyNode> getAsOptional(String key) { - AnyNode result = null; + public Option<AnyNode> getAsOption(String key) { try { - result = get(key); - } catch (JSONException ignored) { + AnyNode value = get(key); + if (value.toString().equals("null")) { + return Option.none(); + } + return Option.some(value); + } catch (JSONException ex) { + return Option.none(); } - return Optional.ofNullable(result); - } - - private JSONObject asJsonObject() { - return (JSONObject) this.obj; - } - - /** - * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that underlying object is of type org.json.JSONObject. - */ - public Map<String, AnyNode> asMap() { - Map<String, AnyNode> map = new HashMap<>(); - getKeys().forEach(key -> map.put(key, get(key))); - return map; - } - - /** - * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that underlying object is of type org.json.JSONObject. - */ - public java.util.List<AnyNode> asList() { - return asStream().collect(Collectors.toList()); } /** - * Converts this object to stream of underlying objects wrapped with AnyNode class. It is assumed that this is of type JSONArray. + * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that + * underlying object is of type org.json.JSONObject. */ - private Stream<AnyNode> asStream() { - return StreamSupport.stream(((JSONArray) this.obj).spliterator(), false).map(AnyNode::new); + public List<AnyNode> toList() { + return List.ofAll(StreamSupport.stream(((JSONArray) this.obj).spliterator(), false).map(AnyNode::new)); } /** * Checks if specified key is present in this. It is assumed that this is of type JSONObject. */ - boolean hasKey(String key) { - return getAsOptional(key).isPresent(); - } - - /** - * Returns empty AnyNode (with null inside) - */ - public static AnyNode nullValue() { - return new AnyNode(JSONObject.NULL.toString()); + public boolean has(String key) { + return !getAsOption(key).isEmpty(); } - private JSONArray asJsonArray() { - return (JSONArray) this.obj; + private JSONObject asJsonObject() { + return (JSONObject) this.obj; } - private AnyNode(Object object) { - this.obj = object; - } } diff --git a/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java b/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java deleted file mode 100644 index 79109c04..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.onap.dcae.commonFunction; -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class CambriaPublisherFactory { - - private final static Logger log = LoggerFactory.getLogger(CambriaPublisherFactory.class); - - CambriaBatchingPublisher createCambriaPublisher(String streamId) - throws MalformedURLException, GeneralSecurityException { - String authpwd = null; - DmaapPropertyReader reader = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile); - Map<String, String> dMaaPProperties = reader.getDmaapProperties(); - String ueburl = dMaaPProperties.get(streamId + ".cambria.url"); - - if (ueburl == null) { - ueburl = dMaaPProperties.get(streamId + ".cambria.hosts"); - } - String topic = reader.getKeyValue(streamId + ".cambria.topic"); - String authuser = reader.getKeyValue(streamId + ".basicAuthUsername"); - - if (authuser != null) { - authpwd = dMaaPProperties.get(streamId + ".basicAuthPassword"); - } - - if ((authuser != null) && (authpwd != null)) { - log.debug(String.format("URL:%sTOPIC:%sAuthUser:%sAuthpwd:%s", ueburl, topic, authuser, authpwd)); - return new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic).usingHttps() - .authenticatedByHttp(authuser, authpwd).logSendFailuresAfter(5) - .build(); - } else { - log.debug(String.format("URL:%sTOPIC:%s", ueburl, topic)); - return new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic) - .logSendFailuresAfter(5) - .build(); - } - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java index b60104ea..3469531e 100644 --- a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java +++ b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -39,15 +39,6 @@ import com.github.fge.jsonschema.core.report.ProcessingMessage; import com.github.fge.jsonschema.core.report.ProcessingReport;
import com.github.fge.jsonschema.main.JsonSchema;
import com.github.fge.jsonschema.main.JsonSchemaFactory;
-
-import org.apache.catalina.LifecycleException;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.onap.dcae.restapi.RestfulCollectorServlet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
@@ -58,185 +49,193 @@ import java.util.Map; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.catalina.LifecycleException;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onap.dcae.commonFunction.event.publishing.DMaaPConfigurationParser;
+import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
+import org.onap.dcae.restapi.RestfulCollectorServlet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class CommonStartup extends NsaBaseEndpoint implements Runnable {
- private static final String KCONFIG = "c";
- private static final String KSETTING_PORT = "collector.service.port";
- private static final int KDEFAULT_PORT = 8080;
- private static final String KSETTING_SECUREPORT = "collector.service.secure.port";
- private static final int KDEFAULT_SECUREPORT = -1;
- private static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile";
- private static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile";
- private static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location";
- private static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore";
- private static final String KSETTING_KEYALIAS = "collector.keystore.alias";
- private static final String KDEFAULT_KEYALIAS = "tomcat";
- private static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile";
- private static final String[] KDEFAULT_DMAAPCONFIGS = new String[] { "/etc/DmaapConfig.json" };
- private static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag";
- private static final int KDEFAULT_SCHEMAVALIDATOR = -1;
- private static final String KSETTING_SCHEMAFILE = "collector.schema.file";
- private static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}";
- private static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid";
- private static final String KSETTING_AUTHFLAG = "header.authflag";
- private static final int KDEFAULT_AUTHFLAG = 0;
- private static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag";
- private static final int KDEFAULT_EVENTTRANSFORMFLAG = 1;
- private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
- public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");
- static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
- public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");
-
- public static final String KSETTING_AUTHLIST = "header.authlist";
- static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4;
- public static int schemaValidatorflag = -1;
- public static int authflag = 1;
- static int eventTransformFlag = 1;
- public static JSONObject schemaFileJson;
- static String cambriaConfigFile;
- public static String streamID;
-
- static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
- private static ApiServer fTomcatServer = null;
- private static final Logger log = LoggerFactory.getLogger(CommonStartup.class);
-
- private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting {
- final List<ApiServerConnector> connectors = new LinkedList<>();
-
- if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) {
- connectors.add(new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)).secure(false)
- .build());
- }
-
- final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT);
- final String keystoreFile = settings.getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE);
- final String keystorePasswordFile = settings.getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE);
- final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS);
-
- if (securePort > 0) {
- String keystorePassword = readFile(keystorePasswordFile);
- connectors.add(new ApiServerConnector.Builder(securePort).secure(true)
- .keystorePassword(keystorePassword).keystoreFile(keystoreFile).keyAlias(keyAlias).build());
-
- }
-
- schemaValidatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR);
- if (schemaValidatorflag > 0) {
- String schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE);
- schemaFileJson = new JSONObject(schemaFile);
-
- }
- authflag = settings.getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG);
- String[] currentConfigFile = settings.getStrings(KSETTING_DMAAPCONFIGS, KDEFAULT_DMAAPCONFIGS);
- cambriaConfigFile = currentConfigFile[0];
- streamID = settings.getString(KSETTING_DMAAPSTREAMID, null);
- eventTransformFlag = settings.getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG);
-
- fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true)
- .name("collector").build();
- }
-
- public static void main(String[] args) {
- try {
- final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
- final String config = NsaCommandLineUtil.getSetting(argMap, KCONFIG, "collector.properties");
- final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class);
-
- final nvReadableStack settings = new nvReadableStack();
- settings.push(new nvPropertiesFile(settingStream));
- settings.push(new nvReadableTable(argMap));
-
- fProcessingInputQueue = new LinkedBlockingQueue<>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS);
-
- VESLogger.setUpEcompLogging();
-
- CommonStartup cs = new CommonStartup(settings);
-
- Thread commonStartupThread = new Thread(cs);
- commonStartupThread.start();
-
- EventProcessor ep = new EventProcessor();
- ExecutorService executor = Executors.newFixedThreadPool(20);
- for (int i = 0; i < 20; ++i) {
- executor.execute(ep);
- }
-
- } catch (loadException | missingReqdSetting | IOException e) {
- CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage());
- throw new RuntimeException(e);
- } catch (Exception e) {
- System.err.println("Uncaught exception - " + e.getMessage());
- CommonStartup.eplog.error("FATAL_ERROR" + e.getMessage());
- e.printStackTrace(System.err);
- }
- }
-
- public void run() {
- try {
- fTomcatServer.start();
- fTomcatServer.await();
- } catch (LifecycleException | IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static class QueueFullException extends Exception {
- private static final long serialVersionUID = 1L;
- }
-
- public static void handleEvents(JSONArray a) throws QueueFullException, JSONException {
- CommonStartup.metriclog.info("EVENT_PUBLISH_START");
- for (int i = 0; i < a.length(); i++) {
- if (!fProcessingInputQueue.offer(a.getJSONObject(i))) {
- throw new QueueFullException();
- }
- }
- log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
- CommonStartup.metriclog.info("EVENT_PUBLISH_END");
- }
-
- private static String readFile(String path) throws IOException {
- byte[] encoded = Files.readAllBytes(Paths.get(path));
- String pwd = new String(encoded);
- return pwd.substring(0, pwd.length() - 1);
- }
-
- public static String validateAgainstSchema(String jsonData, String jsonSchema) {
- ProcessingReport report;
- String result = "false";
-
- try {
- log.trace("Schema validation for event:" + jsonData);
- JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
- JsonNode data = JsonLoader.fromString(jsonData);
- JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
- JsonSchema schema = factory.getJsonSchema(schemaNode);
- report = schema.validate(data);
- } catch (JsonParseException e) {
- log.error("validateAgainstSchema:JsonParseException for event:" + jsonData);
- return e.getMessage();
- } catch (ProcessingException e) {
- log.error("validateAgainstSchema:Processing exception for event:" + jsonData);
- return e.getMessage();
- } catch (IOException e) {
- log.error(
- "validateAgainstSchema:IO exception; something went wrong trying to read json data for event:" + jsonData);
- return e.getMessage();
- }
- if (report != null) {
- for (ProcessingMessage pm : report) {
- log.trace("Processing Message: " + pm.getMessage());
- }
- result = String.valueOf(report.isSuccess());
- }
- try {
- log.debug("Validation Result:" + result + " Validation report:" + report);
- } catch (NullPointerException e) {
- log.error("validateAgainstSchema:NullpointerException on report");
- }
- return result;
- }
+ private static final String KCONFIG = "c";
+ private static final String KSETTING_PORT = "collector.service.port";
+ private static final int KDEFAULT_PORT = 8080;
+ private static final String KSETTING_SECUREPORT = "collector.service.secure.port";
+ private static final int KDEFAULT_SECUREPORT = -1;
+ private static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile";
+ private static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile";
+ private static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location";
+ private static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore";
+ private static final String KSETTING_KEYALIAS = "collector.keystore.alias";
+ private static final String KDEFAULT_KEYALIAS = "tomcat";
+ private static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile";
+ private static final String[] KDEFAULT_DMAAPCONFIGS = new String[]{"/etc/DmaapConfig.json"};
+ private static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag";
+ private static final int KDEFAULT_SCHEMAVALIDATOR = -1;
+ private static final String KSETTING_SCHEMAFILE = "collector.schema.file";
+ private static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}";
+ private static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid";
+ private static final String KSETTING_AUTHFLAG = "header.authflag";
+ private static final int KDEFAULT_AUTHFLAG = 0;
+ private static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag";
+ private static final int KDEFAULT_EVENTTRANSFORMFLAG = 1;
+ private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
+ public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");
+ static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
+ public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");
+
+ public static final String KSETTING_AUTHLIST = "header.authlist";
+ static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4;
+ public static int schemaValidatorflag = -1;
+ public static int authflag = 1;
+ static int eventTransformFlag = 1;
+ public static JSONObject schemaFileJson;
+ static String cambriaConfigFile;
+ public static String streamID;
+
+ static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
+ private static ApiServer fTomcatServer = null;
+ private static final Logger log = LoggerFactory.getLogger(CommonStartup.class);
+
+ private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting {
+ final List<ApiServerConnector> connectors = new LinkedList<>();
+
+ if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) {
+ connectors.add(new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)).secure(false)
+ .build());
+ }
+
+ final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT);
+ final String keystoreFile = settings.getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE);
+ final String keystorePasswordFile = settings.getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE);
+ final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS);
+
+ if (securePort > 0) {
+ String keystorePassword = readFile(keystorePasswordFile);
+ connectors.add(new ApiServerConnector.Builder(securePort).secure(true)
+ .keystorePassword(keystorePassword).keystoreFile(keystoreFile).keyAlias(keyAlias).build());
+
+ }
+
+ schemaValidatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR);
+ if (schemaValidatorflag > 0) {
+ String schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE);
+ schemaFileJson = new JSONObject(schemaFile);
+
+ }
+ authflag = settings.getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG);
+ String[] currentConfigFile = settings.getStrings(KSETTING_DMAAPCONFIGS, KDEFAULT_DMAAPCONFIGS);
+ cambriaConfigFile = currentConfigFile[0];
+ streamID = settings.getString(KSETTING_DMAAPSTREAMID, null);
+ eventTransformFlag = settings.getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG);
+
+ fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true)
+ .name("collector").build();
+ }
+
+ public static void main(String[] args) {
+ try {
+ final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
+ final String config = NsaCommandLineUtil.getSetting(argMap, KCONFIG, "collector.properties");
+ final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class);
+
+ final nvReadableStack settings = new nvReadableStack();
+ settings.push(new nvPropertiesFile(settingStream));
+ settings.push(new nvReadableTable(argMap));
+
+ fProcessingInputQueue = new LinkedBlockingQueue<>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS);
+
+ VESLogger.setUpEcompLogging();
+
+ CommonStartup cs = new CommonStartup(settings);
+
+ Thread commonStartupThread = new Thread(cs);
+ commonStartupThread.start();
+
+ EventProcessor ep = new EventProcessor(EventPublisher.createPublisher(oplog,
+ DMaaPConfigurationParser
+ .parseToDomainMapping(Paths.get(cambriaConfigFile))
+ .get()));
+ ExecutorService executor = Executors.newFixedThreadPool(20);
+ for (int i = 0; i < 20; ++i) {
+ executor.execute(ep);
+ }
+ } catch (Exception e) {
+ CommonStartup.eplog.error("Fatal error during application startup", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void run() {
+ try {
+ fTomcatServer.start();
+ fTomcatServer.await();
+ } catch (LifecycleException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static class QueueFullException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static void handleEvents(JSONArray a) throws QueueFullException, JSONException {
+ CommonStartup.metriclog.info("EVENT_PUBLISH_START");
+ for (int i = 0; i < a.length(); i++) {
+ if (!fProcessingInputQueue.offer(a.getJSONObject(i))) {
+ throw new QueueFullException();
+ }
+ }
+ log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
+ CommonStartup.metriclog.info("EVENT_PUBLISH_END");
+ }
+
+ private static String readFile(String path) throws IOException {
+ byte[] encoded = Files.readAllBytes(Paths.get(path));
+ String pwd = new String(encoded);
+ return pwd.substring(0, pwd.length() - 1);
+ }
+
+ public static String validateAgainstSchema(String jsonData, String jsonSchema) {
+ ProcessingReport report;
+ String result = "false";
+
+ try {
+ log.trace("Schema validation for event:" + jsonData);
+ JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
+ JsonNode data = JsonLoader.fromString(jsonData);
+ JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
+ JsonSchema schema = factory.getJsonSchema(schemaNode);
+ report = schema.validate(data);
+ } catch (JsonParseException e) {
+ log.error("validateAgainstSchema:JsonParseException for event:" + jsonData);
+ return e.getMessage();
+ } catch (ProcessingException e) {
+ log.error("validateAgainstSchema:Processing exception for event:" + jsonData);
+ return e.getMessage();
+ } catch (IOException e) {
+ log.error(
+ "validateAgainstSchema:IO exception; something went wrong trying to read json data for event:" + jsonData);
+ return e.getMessage();
+ }
+ if (report != null) {
+ for (ProcessingMessage pm : report) {
+ log.trace("Processing Message: " + pm.getMessage());
+ }
+ result = String.valueOf(report.isSuccess());
+ }
+ try {
+ log.debug("Validation Result:" + result + " Validation report:" + report);
+ } catch (NullPointerException e) {
+ log.error("validateAgainstSchema:NullpointerException on report");
+ }
+ return result;
+ }
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java deleted file mode 100644 index 0ee1e434..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java +++ /dev/null @@ -1,143 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.commonFunction; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -public class DmaapPropertyReader { - - - private static final Logger log = LoggerFactory.getLogger(DmaapPropertyReader.class); - private static final String CAMBRIA_TOPIC_KEY = "cambria.topic"; - private static final String CAMBRIA_HOSTS_KEY = "cambria.hosts"; - private static final String CAMBRIA_URL_KEY = "cambria.url"; - private static final List<String> LEGACY_CHANNEL_MANDATORY_PARAMS = Lists.newArrayList(CAMBRIA_TOPIC_KEY, CAMBRIA_HOSTS_KEY, CAMBRIA_URL_KEY, "basicAuthPassword", "basicAuthUsername"); - private static DmaapPropertyReader instance = null; - private final Map<String, String> dmaapProperties; - - public DmaapPropertyReader(String cambriaConfigFilePath) { - this.dmaapProperties = DmaapPropertyReader.getProcessedDmaapProperties(cambriaConfigFilePath); - } - - public Map<String, String> getDmaapProperties() { - return dmaapProperties; - } - - public static synchronized DmaapPropertyReader getInstance(String channelConfig) { - if (instance == null) { - instance = new DmaapPropertyReader(channelConfig); - } - return instance; - } - - public String getKeyValue(String hashKey) { - return dmaapProperties.get(hashKey); - } - - private static Map<String, String> getProcessedDmaapProperties(String configFilePath) { - Map<String, String> transformedDmaapProperties = new HashMap<>(); - try { - AnyNode root = AnyNode.parse(configFilePath); - if (isInLegacyFormat(root)) { - transformedDmaapProperties = getLegacyDmaapPropertiesWithChannels(root.get("channels")); - } else { - transformedDmaapProperties = getDmaapPropertiesWithInfoData(root); - } - } catch (IOException e) { - e.printStackTrace(); - } - return transformedDmaapProperties; - } - - private static boolean isInLegacyFormat(AnyNode root) { - return root.hasKey("channels"); - } - - private static Map<String, String> getLegacyDmaapPropertiesWithChannels(AnyNode channelsNode) { - return channelsNode.asList().stream() - .map(DmaapPropertyReader::getTransformedMandatoryChannelProperties) - .flatMap(m -> m.entrySet().stream()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - - private static Map<String, String> getTransformedMandatoryChannelProperties(AnyNode channel) { - String prefix = channel.get("name").asString() + "."; - return channel.asMap().entrySet().stream().filter(el -> LEGACY_CHANNEL_MANDATORY_PARAMS.contains(el.getKey()) && !Objects.equals(el.getKey(), "name")) - .collect(Collectors.toMap(k -> prefix + k.getKey(), v -> v.getValue().asString().replace("\"", ""))); - } - - private static Map<String, String> getDmaapPropertiesWithInfoData(AnyNode root) { - return root.asMap().entrySet().stream() - .map(DmaapPropertyReader::getTransformedMandatoryInfoProperties).flatMap(m -> m.entrySet().stream()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - - private static Map<String, String> getTransformedMandatoryInfoProperties(Map.Entry<String, AnyNode> el) { - String prefix = el.getKey() + "."; - AnyNode val = el.getValue(); - Map<String, String> map = Maps.newHashMap(); - map.put(prefix + "basicAuthUsername", val.getAsOptional("aaf_username").orElse(AnyNode.nullValue()).asString().replace("\"", "")); - map.put(prefix + "basicAuthPassword", val.getAsOptional("aaf_password").orElse(AnyNode.nullValue()).asString().replace("\"", "")); - map.putAll(getParamsFromDmaapInfoTopicUrl(prefix, val.get("dmaap_info").get("topic_url").asString().replace("\"", ""))); - return map; - } - - /*** - * Dmaap url structure pub - https://<dmaaphostname>:<port>/events/ - * <namespace>.<dmaapcluster>.<topic>, sub - https://<dmaaphostname>: - * <port>/events/<namespace>.<dmaapcluster>.<topic>/G1/u1"; - * - * Onap url structure pub - http://<dmaaphostname>:<port>/<unauthenticated>. - * <topic>, - */ - private static Map<String, String> getParamsFromDmaapInfoTopicUrl(String keyPrefix, String topicUrl) { - Map<String, String> topicUrlParts = Maps.newHashMap(); - try { - URL url = new URL(topicUrl); - topicUrlParts.put(keyPrefix + CAMBRIA_URL_KEY, url.getAuthority()); - String[] pathParts = url.getPath().split("/"); - if (pathParts.length > 2 && "events".equals(pathParts[1])) { - // DCAE internal dmaap topic convention - topicUrlParts.put(keyPrefix + CAMBRIA_TOPIC_KEY, pathParts[2]); - } else { - // ONAP dmaap topic convention - topicUrlParts.put(keyPrefix + CAMBRIA_TOPIC_KEY, pathParts[2]); - topicUrlParts.put(keyPrefix + CAMBRIA_HOSTS_KEY, url.getHost()); - } - } catch (MalformedURLException e) { - log.error("Invalid URL found under topic_url key!", e); - e.printStackTrace(); - } - return topicUrlParts; - } -}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java deleted file mode 100644 index a4c62719..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java +++ /dev/null @@ -1,91 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class DmaapPublishers { - - private static final Logger log = LoggerFactory.getLogger(DmaapPublishers.class); - private final LoadingCache<String, CambriaBatchingPublisher> publishers; - - private DmaapPublishers( - LoadingCache<String, CambriaBatchingPublisher> publishers) { - this.publishers = publishers; - } - - static DmaapPublishers create() { - return create(new CambriaPublisherFactory()); - } - - static DmaapPublishers create(final CambriaPublisherFactory publisherFactory) { - final LoadingCache<String, CambriaBatchingPublisher> cache = CacheBuilder.<String, CambriaBatchingPublisher>newBuilder() - .removalListener((RemovalListener<String, CambriaBatchingPublisher>) notification -> { - if (notification.getValue() != null) { - onCacheItemInvalidated(notification.getValue()); - } - }) - .build(new CacheLoader<String, CambriaBatchingPublisher>() { - @Override - public CambriaBatchingPublisher load(String streamId) - throws MalformedURLException, GeneralSecurityException { - try { - return publisherFactory.createCambriaPublisher(streamId); - } catch (MalformedURLException | GeneralSecurityException e) { - log.error("CambriaClientBuilders connection reader exception : streamID - " + streamId + " " - + e.getMessage()); - throw e; - } - } - }); - return new DmaapPublishers(cache); - } - - CambriaBatchingPublisher getByStreamId(String streamId) { - return publishers.getUnchecked(streamId); - } - - void closeByStreamId(String streamId) { - publishers.invalidate(streamId); - } - - private static void onCacheItemInvalidated(CambriaBatchingPublisher pub) { - try { - final List<?> stuck = pub.close(20, TimeUnit.SECONDS); - if (!stuck.isEmpty()) { - log.error(stuck.size() + " messages unsent"); - } - } catch (InterruptedException | IOException e) { - log.error("Caught Exception on Close event: {}", e); - } - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java index 06a27328..9d6ad360 100644 --- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java +++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java @@ -27,6 +27,7 @@ import com.google.common.reflect.TypeToken; import com.google.gson.Gson;
import java.util.Map;
import org.json.JSONObject;
+import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,8 +53,10 @@ class EventProcessor implements Runnable { static Map<String, String[]> streamidHash = new HashMap<>();
public JSONObject event;
+ private EventPublisher eventPublisher;
- EventProcessor() {
+ public EventProcessor(EventPublisher eventPublisher) {
+ this.eventPublisher = eventPublisher;
streamidHash = parseStreamIdToStreamHashMapping(CommonStartup.streamID);
}
@@ -128,7 +131,7 @@ class EventProcessor implements Runnable { for (String aStreamIdList : streamIdList) {
log.info("Invoking publisher for streamId:" + aStreamIdList);
this.overrideEvent();
- EventPublisherHash.getInstance().sendEvent(event, aStreamIdList);
+ eventPublisher.sendEvent(event, aStreamIdList);
}
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java b/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java deleted file mode 100644 index f3907126..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java +++ /dev/null @@ -1,85 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.commonFunction; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; -import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EventPublisherHash { - - private static final String VES_UNIQUE_ID = "VESuniqueId"; - private static final Logger log = LoggerFactory.getLogger(EventPublisherHash.class); - private static volatile EventPublisherHash instance = new EventPublisherHash(DmaapPublishers.create()); - private final DmaapPublishers dmaapPublishers; - - public static EventPublisherHash getInstance() { - return instance; - } - - @VisibleForTesting - EventPublisherHash(DmaapPublishers dmaapPublishers) { - this.dmaapPublishers = dmaapPublishers; - } - - void sendEvent(JSONObject event, String streamid) { - log.debug("EventPublisher.sendEvent: instance for publish is ready"); - clearVesUniqueId(event); - - try { - sendEventUsingCachedPublisher(streamid, event); - } catch (IOException | IllegalArgumentException e) { - log.error("Unable to publish event: {} streamID: {}. Exception: {}", event, streamid, e); - dmaapPublishers.closeByStreamId(streamid); - } - } - - private void clearVesUniqueId(JSONObject event) { - if (event.has(VES_UNIQUE_ID)) { - String uuid = event.get(VES_UNIQUE_ID).toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - log.debug("Removing VESuniqueid object from event"); - event.remove(VES_UNIQUE_ID); - } - } - - private void sendEventUsingCachedPublisher(String streamid, JSONObject event) throws IOException { - int pendingMsgs = dmaapPublishers.getByStreamId(streamid).send("MyPartitionKey", event.toString()); - if (pendingMsgs > 100) { - log.info("Pending Message Count=" + pendingMsgs); - } - log.info("pub.send invoked - no error"); - CommonStartup.oplog.info(String.format("StreamID:%s Event Published:%s ", streamid, event)); - } - - @VisibleForTesting - CambriaBatchingPublisher getDmaapPublisher(String streamId) { - return dmaapPublishers.getByStreamId(streamId); - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java new file mode 100644 index 00000000..bef14448 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java @@ -0,0 +1,108 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.List; +import static io.vavr.API.Try; +import static io.vavr.API.Tuple; +import static io.vavr.API.unchecked; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; + +import io.vavr.collection.List; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import io.vavr.control.Try; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import org.onap.dcae.commonFunction.AnyNode; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +@SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do") +public final class DMaaPConfigurationParser { + + public static Try<Map<String, PublisherConfig>> parseToDomainMapping(Path configLocation) { + return readFromFile(configLocation) + .flatMap(DMaaPConfigurationParser::toJSON) + .flatMap(DMaaPConfigurationParser::toConfigMap); + } + + private static Try<String> readFromFile(Path configLocation) { + return Try(() -> new String(Files.readAllBytes(configLocation))) + .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation))); + } + + private static Try<AnyNode> toJSON(String config) { + return Try(() -> AnyNode.fromString(config)) + .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config))); + } + + private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) { + return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config)) + .mapFailure(enhanceError( + f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config))); + } + + private static boolean usesLegacyFormat(AnyNode dMaaPConfig) { + return dMaaPConfig.has("channels"); + } + + private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) { + return root.get("channels").toList().toMap( + channel -> channel.get("name").toString(), + channel -> { + String destinationsStr = channel.getAsOption("cambria.url") + .getOrElse(channel.getAsOption("cambria.hosts").get()) + .toString(); + String topic = channel.get("cambria.topic").toString(); + Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString); + Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString); + List<String> destinations = List(destinationsStr.split(",")); + return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); + }); + } + + private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) { + return root.keys().toMap( + channelName -> channelName, + channelName -> { + AnyNode channelConfig = root.get(channelName); + Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString); + Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString); + URL topicURL = unchecked( + () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply(); + String[] pathSegments = topicURL.getPath().substring(1).split("/"); + String topic = pathSegments[1]; + String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost(); + List<String> destinations = List(destination); + return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); + }); + } + + private static PublisherConfig buildBasedOnAuth(Option<String> maybeUser, Option<String> maybePassword, + String topic, List<String> destinations) { + return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password))) + .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2)) + .getOrElse(new PublisherConfig(destinations, topic)); + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java new file mode 100644 index 00000000..fd9b3ae1 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java @@ -0,0 +1,99 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.commonFunction.event.publishing; + +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.clock.SaClock; +import com.att.nsa.logging.LoggingContext; +import com.att.nsa.logging.log4j.EcompFields; +import io.vavr.collection.Map; +import io.vavr.control.Try; +import java.io.IOException; +import org.json.JSONObject; +import org.onap.dcae.commonFunction.VESLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +class DMaaPEventPublisher implements EventPublisher { + private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; + private static final String VES_UNIQUE_ID = "VESuniqueId"; + private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); + private final DMaaPPublishersCache publishersCache; + private final Logger outputLogger; + + DMaaPEventPublisher(DMaaPPublishersCache DMaaPPublishersCache, + Logger outputLogger) { + this.publishersCache = DMaaPPublishersCache; + this.outputLogger = outputLogger; + } + + @Override + public void sendEvent(JSONObject event, String domain) { + clearVesUniqueIdFromEvent(event); + publishersCache.getPublisher(domain) + .onEmpty(() -> + log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event))) + .forEach(publisher -> sendEvent(event, domain, publisher)); + } + + @Override + public void reconfigure(Map<String, PublisherConfig> dMaaPConfig) { + publishersCache.reconfigure(dMaaPConfig); + } + + private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) { + Try.run(() -> uncheckedSendEvent(event, domain, publisher)) + .onFailure(exc -> closePublisher(event, domain, exc)); + } + + private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) + throws IOException { + int pendingMsgs = publisher.send("MyPartitionKey", event.toString()); + if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { + log.info("Pending messages count: " + pendingMsgs); + } + String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain); + log.info(infoMsg); + outputLogger.info(infoMsg); + } + + private void closePublisher(JSONObject event, String domain, Throwable e) { + log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", + event, domain), e); + publishersCache.closePublisherFor(domain); + } + + private void clearVesUniqueIdFromEvent(JSONObject event) { + if (event.has(VES_UNIQUE_ID)) { + String uuid = event.get(VES_UNIQUE_ID).toString(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + log.debug("Removing VESuniqueid object from event"); + event.remove(VES_UNIQUE_ID); + } + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java new file mode 100644 index 00000000..a7865a45 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java @@ -0,0 +1,62 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.Try; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; +import io.vavr.control.Try; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +final class DMaaPPublishersBuilder { + + @SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do") + static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) { + return Try(() -> builder(config).build()) + .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config))); + } + + private static PublisherBuilder builder(PublisherConfig config) { + if (config.isSecured()) { + return authenticatedBuilder(config); + } else { + return unAuthenticatedBuilder(config); + } + } + + private static PublisherBuilder authenticatedBuilder(PublisherConfig config) { + return unAuthenticatedBuilder(config) + .usingHttps() + .authenticatedByHttp(config.userName().get(), config.password().get()); + } + + private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) { + return new CambriaClientBuilders.PublisherBuilder() + .usingHosts(config.destinations().mkString(",")) + .onTopic(config.topic()) + .logSendFailuresAfter(5); + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java new file mode 100644 index 00000000..102d2774 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java @@ -0,0 +1,124 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.Option; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +class DMaaPPublishersCache { + + private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class); + private final LoadingCache<String, CambriaBatchingPublisher> publishersCache; + private AtomicReference<Map<String, PublisherConfig>> dMaaPConfiguration; + + DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) { + this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); + this.publishersCache = CacheBuilder.newBuilder() + .removalListener(new OnPublisherRemovalListener()) + .build(new CambriaPublishersCacheLoader()); + } + + DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader, + OnPublisherRemovalListener onPublisherRemovalListener, + Map<String, PublisherConfig> dMaaPConfiguration) { + this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); + this.publishersCache = CacheBuilder.newBuilder() + .removalListener(onPublisherRemovalListener) + .build(dMaaPPublishersCacheLoader); + } + + Option<CambriaBatchingPublisher> getPublisher(String streamID) { + try { + return Option(publishersCache.getUnchecked(streamID)); + } catch (Exception e) { + log.warn("Could not create / load Cambria Publisher for streamID", e); + return Option.none(); + } + } + + void closePublisherFor(String streamId) { + publishersCache.invalidate(streamId); + } + + synchronized void reconfigure(Map<String, PublisherConfig> newConfig) { + Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get(); + Map<String, PublisherConfig> removedConfigurations = currentConfig + .filterKeys(domain -> !newConfig.containsKey(domain)); + Map<String, PublisherConfig> changedConfigurations = newConfig + .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); + dMaaPConfiguration.set(newConfig); + removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1)); + } + + static class OnPublisherRemovalListener implements RemovalListener<String, CambriaBatchingPublisher> { + + @Override + public void onRemoval(@Nonnull RemovalNotification<String, CambriaBatchingPublisher> notification) { + CambriaBatchingPublisher publisher = notification.getValue(); + if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull + try { + int timeout = 20; + TimeUnit unit = TimeUnit.SECONDS; + java.util.List<?> stuck = publisher.close(timeout, unit); + if (!stuck.isEmpty()) { + log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', " + + "%s messages were dropped", stuck.size(), timeout, unit)); + } + } catch (InterruptedException | IOException e) { + log.error("Could not close Cambria publisher, some messages might have been dropped", e); + } + } + } + } + + class CambriaPublishersCacheLoader extends CacheLoader<String, CambriaBatchingPublisher> { + + @Override + public CambriaBatchingPublisher load(@Nonnull String domain) { + return dMaaPConfiguration.get() + .get(domain) + .toTry(() -> new RuntimeException( + f("DMaaP configuration contains no configuration for domain: '%s'", domain))) + .flatMap(DMaaPPublishersBuilder::buildPublisher) + .get(); + } + } + +} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java new file mode 100644 index 00000000..9cd718f8 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import io.vavr.collection.Map; +import org.json.JSONObject; +import org.slf4j.Logger; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +public interface EventPublisher { + + static EventPublisher createPublisher(Logger outputLogger, Map<String, PublisherConfig> dMaaPConfig) { + return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger); + } + + void sendEvent(JSONObject event, String domain); + + void reconfigure(Map<String, PublisherConfig> dMaaPConfig); +} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java new file mode 100644 index 00000000..4a056778 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java @@ -0,0 +1,98 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import io.vavr.collection.List; +import io.vavr.control.Option; +import java.util.Objects; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +public final class PublisherConfig { + + private final List<String> destinations; + private final String topic; + private String userName; + private String password; + + PublisherConfig(List<String> destinations, String topic) { + this.destinations = destinations; + this.topic = topic; + } + + PublisherConfig(List<String> destinations, String topic, String userName, String password) { + this.destinations = destinations; + this.topic = topic; + this.userName = userName; + this.password = password; + } + + List<String> destinations() { + return destinations; + } + + String topic() { + return topic; + } + + Option<String> userName() { + return Option.of(userName); + } + + Option<String> password() { + return Option.of(password); + } + + boolean isSecured() { + return userName().isDefined() && password().isDefined(); + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PublisherConfig that = (PublisherConfig) o; + return Objects.equals(destinations, that.destinations) && + Objects.equals(topic, that.topic) && + Objects.equals(userName, that.userName) && + Objects.equals(password, that.password); + } + + @Override + public int hashCode() { + return Objects.hash(destinations, topic, userName, password); + } + + @Override + public String toString() { + return "PublisherConfig{" + + "destinations=" + destinations + + ", topic='" + topic + '\'' + + ", userName='" + userName + '\'' + + ", password='" + password + '\'' + + '}'; + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java new file mode 100644 index 00000000..9bf3ef8c --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.$; + +import io.vavr.API; +import io.vavr.API.Match.Case; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +final class VavrUtils { + + private VavrUtils() { + // utils aggregator + } + + /** + * Shortcut for 'string interpolation' + */ + static String f(String msg, Object... args) { + return String.format(msg, args); + } + + /** + * Wrap failure with a more descriptive message of what has failed and chain original cause. Used to provide a + * context for errors instead of raw exception. + */ + static Case<Throwable, Throwable> enhanceError(String msg) { + return API.Case($(), e -> new RuntimeException(msg, e)); + } + +} diff --git a/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java b/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java deleted file mode 100644 index f4955ac8..00000000 --- a/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction; - -import static org.hamcrest.CoreMatchers.allOf; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.junit.Assert.assertSame; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaPublisher; -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.UncheckedExecutionException; -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.concurrent.TimeUnit; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - - -@RunWith(MockitoJUnitRunner.class) -public class DmaapPublishersTest { - - @Mock - private CambriaPublisherFactory publisherFactory; - @Mock - private CambriaBatchingPublisher cambriaPublisher; - private DmaapPublishers cut; - - @Rule - public final ExpectedException expectedException = ExpectedException.none(); - - @Before - public void setUp() throws MalformedURLException, GeneralSecurityException { - given(publisherFactory.createCambriaPublisher(anyString())).willReturn(cambriaPublisher); - cut = DmaapPublishers.create(publisherFactory); - } - - @Test - public void getByStreamIdShouldUseCachedItem() throws IOException, GeneralSecurityException { - // given - String streamId = "sampleStream"; - - // when - CambriaBatchingPublisher firstPublisher = cut.getByStreamId(streamId); - CambriaBatchingPublisher secondPublisher = cut.getByStreamId(streamId); - - // then - verify(publisherFactory, times(1)).createCambriaPublisher(streamId); - assertSame("should return same instance", firstPublisher, secondPublisher); - } - - @Test - public void getByStreamIdShouldHandleErrors() throws MalformedURLException, GeneralSecurityException { - // given - MalformedURLException exception = new MalformedURLException(); - given(publisherFactory.createCambriaPublisher(anyString())).willThrow(exception); - expectedException.expect(allOf( - instanceOf(UncheckedExecutionException.class), - causeIsInstanceOf(exception.getClass()))); - - // when - cut.getByStreamId("a stream"); - - // then - // exception should have been thrown - } - - @Test - public void closeByStreamIdShouldCloseConnection() throws IOException, InterruptedException { - // given - String streamId = "sampleStream"; - given(cambriaPublisher.close(anyLong(), any(TimeUnit.class))) - .willReturn(ImmutableList.of(new CambriaPublisher.message("p", "msg"))); - - // when - CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId); - cut.closeByStreamId(streamId); - - // then - assertSame("should return proper publisher", cambriaPublisher, cachedPublisher); - verify(cambriaPublisher).close(20, TimeUnit.SECONDS); - } - - @Test - public void closeByStreamIdShouldHandleErrors() throws IOException, InterruptedException { - // given - String streamId = "sampleStream"; - given(cambriaPublisher.close(anyLong(), any(TimeUnit.class))).willThrow(IOException.class); - - // when - CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId); - cut.closeByStreamId(streamId); - - // then - assertSame("should return proper publisher", cambriaPublisher, cachedPublisher); - verify(cambriaPublisher).close(20, TimeUnit.SECONDS); - } - - private Matcher<Exception> causeIsInstanceOf(final Class<?> clazz) { - return new BaseMatcher<Exception>() { - @Override - public boolean matches(Object o) { - return o instanceof Throwable && clazz.isInstance(((Throwable) o).getCause()); - } - - @Override - public void describeTo(Description description) { - description.appendText("exception cause should be an instance of " + clazz.getName()); - } - }; - } -}
\ No newline at end of file diff --git a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java index 973ee014..e211c12a 100644 --- a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java +++ b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java @@ -20,17 +20,17 @@ */ package org.onap.dcae.commonFunction; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.google.gson.Gson; +import java.util.concurrent.atomic.AtomicReference; import org.json.JSONObject; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import java.util.List; +import org.onap.dcae.commonFunction.event.publishing.EventPublisher; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -52,7 +52,7 @@ public class EventProcessorTest { @Test public void testLoad() { //given - EventProcessor ec = new EventProcessor(); + EventProcessor ec = new EventProcessor(mock(EventPublisher.class)); ec.event = new org.json.JSONObject(ev); //when ec.overrideEvent(); @@ -65,7 +65,7 @@ public class EventProcessorTest { @Test public void shouldParseJsonEvents() throws ReflectiveOperationException { //given - EventProcessor eventProcessor = new EventProcessor(); + EventProcessor eventProcessor = new EventProcessor(mock(EventPublisher.class)); String event_json = "[{ \"filter\": {\"event.commonEventHeader.domain\":\"heartbeat\",\"VESversion\":\"v4\"},\"processors\":[" + "{\"functionName\": \"concatenateValue\",\"args\":{\"field\":\"event.commonEventHeader.eventName\",\"concatenate\": [\"$event.commonEventHeader.domain\",\"$event.commonEventHeader.eventType\",\"$event.faultFields.alarmCondition\"], \"delimiter\":\"_\"}}" + ",{\"functionName\": \"addAttribute\",\"args\":{\"field\": \"event.heartbeatFields.heartbeatFieldsVersion\",\"value\": \"1.0\",\"fieldType\": \"number\"}}" + @@ -84,32 +84,5 @@ public class EventProcessorTest { assertThat(stringArgumentCaptor.getAllValues()).contains("concatenateValue", "addAttribute", "map"); } - @Test - public void shouldCreateDmaapPublisher() { - - //given - EventPublisherHash eph = EventPublisherHash.getInstance(); - EventProcessor ec = new EventProcessor(); - ec.event = new org.json.JSONObject(ev); - CommonStartup.cambriaConfigFile = "src/test/resources/testDmaapConfig_ip.json"; - - //when - CambriaBatchingPublisher pub = eph.getDmaapPublisher("sec_fault_ueb"); - - //then - assertNotNull(pub); - } - - @Test - public void shouldSendEventWithNoError() { - - EventPublisherHash eph = EventPublisherHash.getInstance(); - EventProcessor eventProcessor = new EventProcessor(); - eventProcessor.event = new org.json.JSONObject(ev); - CommonStartup.cambriaConfigFile = "src/test/resources/testDmaapConfig_ip.json"; - - //when - eph.sendEvent(eventProcessor.event, "sec_fault_ueb"); - } } diff --git a/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java b/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java index e0fd5a42..12428024 100644 --- a/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java +++ b/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java @@ -22,6 +22,7 @@ package org.onap.dcae.commonFunction; import static java.util.Base64.getDecoder; import static java.util.Base64.getEncoder; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.att.nsa.cmdLine.NsaCommandLineUtil; @@ -43,6 +44,7 @@ import org.json.JSONObject; import org.junit.Test; import org.mockito.Mockito; import org.onap.dcae.commonFunction.CommonStartup.QueueFullException; +import org.onap.dcae.commonFunction.event.publishing.EventPublisher; import org.onap.dcae.restapi.RestfulCollectorServlet; @@ -79,7 +81,7 @@ public class TestCommonStartup { public void testParseStreamIdToStreamHashMapping() { // given CommonStartup.streamID = "fault=sec_fault|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert|voiceQuality=ves_voicequality|sipSignaling=ves_sipsignaling"; - EventProcessor eventProcessor = new EventProcessor(); + EventProcessor eventProcessor = new EventProcessor(mock(EventPublisher.class)); // when Map<String, String[]> streamHashMapping = EventProcessor.streamidHash; diff --git a/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java new file mode 100644 index 00000000..5a94c662 --- /dev/null +++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java @@ -0,0 +1,114 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.List; +import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcae.commonFunction.event.publishing.DMaaPConfigurationParser.parseToDomainMapping; + +import io.vavr.collection.Map; +import io.vavr.control.Try; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.junit.Test; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +public class DMaaPConfigurationParserTest { + + @Test + public void testParseCredentialsForGen2() { + Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsGen2.json"); + Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path); + + PublisherConfig authCredentialsNulls = publisherConfigs.get().get("auth-credentials-null").getOrNull(); + assertThat(authCredentialsNulls.userName().isEmpty()).isTrue(); + assertThat(authCredentialsNulls.password().isEmpty()).isTrue(); + assertThat(authCredentialsNulls.isSecured()).isFalse(); + + PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull(); + assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser"); + assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword"); + assertThat(authCredentialsPresent.isSecured()).isTrue(); + + PublisherConfig authCredentialsKeysMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull(); + assertThat(authCredentialsKeysMissing.userName().isEmpty()).isTrue(); + assertThat(authCredentialsKeysMissing.password().isEmpty()).isTrue(); + assertThat(authCredentialsKeysMissing.isSecured()).isFalse(); + } + + + @Test + public void testParseCredentialsForLegacy() { + Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsLegacy.json"); + Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path); + + PublisherConfig authCredentialsNull = publisherConfigs.get().get("auth-credentials-null").getOrNull(); + assertThat(authCredentialsNull.userName().isEmpty()).isTrue(); + assertThat(authCredentialsNull.password().isEmpty()).isTrue(); + assertThat(authCredentialsNull.isSecured()).isFalse(); + + PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull(); + assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser"); + assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword"); + assertThat(authCredentialsPresent.isSecured()).isTrue(); + + PublisherConfig authCredentialsMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull(); + assertThat(authCredentialsMissing.userName().isEmpty()).isTrue(); + assertThat(authCredentialsMissing.password().isEmpty()).isTrue(); + assertThat(authCredentialsMissing.isSecured()).isFalse(); + } + + + @Test + public void testParseGen2() { + Path path = Paths.get("src/test/resources/testParseDMaaPGen2.json"); + Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path); + + PublisherConfig withEventsSegment = publisherConfigs.get().get("event-segments-with-port").getOrNull(); + assertThat(withEventsSegment.destinations()).isEqualTo(List("UEBHOST:3904")); + assertThat(withEventsSegment.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + + PublisherConfig withOtherSegment = publisherConfigs.get().get("other-segments-without-ports").getOrNull(); + assertThat(withOtherSegment.destinations()).isEqualTo(List("UEBHOST")); + assertThat(withOtherSegment.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + } + + @Test + public void testParseLegacy() { + Path exemplaryConfig = Paths.get("src/test/resources/testParseDMaaPLegacy.json"); + Try<Map<String, PublisherConfig>> publisherConfigs = DMaaPConfigurationParser + .parseToDomainMapping(exemplaryConfig); + + PublisherConfig urlFirstThenHosts = publisherConfigs.get().get("url-precedes-hosts").getOrNull(); + assertThat(urlFirstThenHosts.destinations()).isEqualTo(List("127.0.0.1:3904")); + assertThat(urlFirstThenHosts.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + + PublisherConfig urlKeyMissing = publisherConfigs.get().get("url-key-missing").getOrNull(); + assertThat(urlKeyMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com")); + assertThat(urlKeyMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + + PublisherConfig urlIsMissing = publisherConfigs.get().get("url-is-null").getOrNull(); + assertThat(urlIsMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com")); + assertThat(urlIsMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + } +}
\ No newline at end of file diff --git a/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java index 81c6556b..bbe5079e 100644 --- a/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java +++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java @@ -17,74 +17,73 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.commonFunction; +package org.onap.dcae.commonFunction.event.publishing; +import static io.vavr.API.Option; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.att.nsa.cambria.client.CambriaBatchingPublisher; import java.io.IOException; import org.json.JSONObject; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; -@RunWith(MockitoJUnitRunner.class) -public class EventPublisherHashTest { - private EventPublisherHash cut; +public class DMaaPEventPublisherTest { - @Mock - private DmaapPublishers dmaapPublishers; - @Mock + private static final String STREAM_ID = "sampleStreamId"; + + private DMaaPEventPublisher eventPublisher; private CambriaBatchingPublisher cambriaPublisher; + private DMaaPPublishersCache DMaaPPublishersCache; @Before public void setUp() { - given(dmaapPublishers.getByStreamId(anyString())).willReturn(cambriaPublisher); - - cut = new EventPublisherHash(dmaapPublishers); + cambriaPublisher = mock(CambriaBatchingPublisher.class); + DMaaPPublishersCache = mock(DMaaPPublishersCache.class); + when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher)); + eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache, mock(Logger.class)); } @Test - public void sendEventShouldSendEventToATopic() throws Exception { + public void shouldSendEventToTopic() throws Exception { // given JSONObject event = new JSONObject("{}"); - final String streamId = "sampleStreamId"; // when - cut.sendEvent(event, streamId); + eventPublisher.sendEvent(event, STREAM_ID); // then verify(cambriaPublisher).send("MyPartitionKey", event.toString()); } @Test - public void sendEventShouldRemoveUuid() throws Exception { + public void shouldRemoveInternalVESUIDBeforeSending() throws Exception { // given - JSONObject event = new JSONObject("{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}"); - final String streamId = "sampleStreamId"; + JSONObject event = new JSONObject( + "{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}"); // when - cut.sendEvent(event, streamId); + eventPublisher.sendEvent(event, STREAM_ID); // then verify(cambriaPublisher).send("MyPartitionKey", new JSONObject("{\"another\": 8}").toString()); } @Test - public void sendEventShouldCloseConnectionWhenExceptionOccurred() throws Exception { + public void shouldCloseConnectionWhenExceptionOccurred() throws Exception { // given JSONObject event = new JSONObject("{}"); - final String streamId = "sampleStreamId"; given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail")); // when - cut.sendEvent(event, streamId); + eventPublisher.sendEvent(event, STREAM_ID); // then - verify(dmaapPublishers).closeByStreamId(streamId); + verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID); } }
\ No newline at end of file diff --git a/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java new file mode 100644 index 00000000..8dc69f62 --- /dev/null +++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java @@ -0,0 +1,126 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.List; +import static io.vavr.API.Map; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.onap.dcae.commonFunction.event.publishing.DMaaPPublishersCache.CambriaPublishersCacheLoader; +import org.onap.dcae.commonFunction.event.publishing.DMaaPPublishersCache.OnPublisherRemovalListener; + + +public class DMaaPPublishersCacheTest { + + private String streamId1; + private Map<String, PublisherConfig> dMaaPConfigs; + + @Before + public void setUp() { + streamId1 = "sampleStream1"; + dMaaPConfigs = Map("sampleStream1", new PublisherConfig(List("destination1"), "topic1")); + } + + @Test + public void shouldReturnTheSameCachedInstanceOnConsecutiveRetrievals() { + // given + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs); + + // when + Option<CambriaBatchingPublisher> firstPublisher = dMaaPPublishersCache.getPublisher(streamId1); + Option<CambriaBatchingPublisher> secondPublisher = dMaaPPublishersCache.getPublisher(streamId1); + + // then + assertSame("should return same instance", firstPublisher.get(), secondPublisher.get()); + } + + @Test + public void shouldCloseCambriaPublisherOnCacheInvalidate() throws IOException, InterruptedException { + // given + CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class); + CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class); + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock, + new OnPublisherRemovalListener(), + dMaaPConfigs); + when(cacheLoaderMock.load(streamId1)).thenReturn(cambriaPublisherMock1); + + // when + dMaaPPublishersCache.getPublisher(streamId1); + dMaaPPublishersCache.closePublisherFor(streamId1); + + // then + verify(cambriaPublisherMock1).close(20, TimeUnit.SECONDS); + + } + + @Test + public void shouldReturnNoneIfThereIsNoDMaaPConfigurationForGivenStreamID() { + // given + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs); + + // then + assertTrue("should not exist", dMaaPPublishersCache.getPublisher("non-existing").isEmpty()); + } + + + @Test + public void shouldCloseOnlyChangedPublishers() throws IOException, InterruptedException { + // given + CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class); + CambriaBatchingPublisher cambriaPublisherMock2 = mock(CambriaBatchingPublisher.class); + CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class); + String firstDomain = "domain1"; + String secondDomain = "domain2"; + Map<String, PublisherConfig> oldConfig = Map(firstDomain, + new PublisherConfig(List("destination1"), "topic1"), + secondDomain, + new PublisherConfig(List("destination2"), "topic2", + "user", "pass")); + Map<String, PublisherConfig> newConfig = Map(firstDomain, new PublisherConfig(List("destination1"), "topic1"), + secondDomain, new PublisherConfig(List("destination2"), "topic2")); + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock, + new OnPublisherRemovalListener(), + oldConfig); + when(cacheLoaderMock.load(firstDomain)).thenReturn(cambriaPublisherMock1); + when(cacheLoaderMock.load(secondDomain)).thenReturn(cambriaPublisherMock2); + + dMaaPPublishersCache.getPublisher(firstDomain); + dMaaPPublishersCache.getPublisher(secondDomain); + + // when + dMaaPPublishersCache.reconfigure(newConfig); + + // then + verify(cambriaPublisherMock2).close(20, TimeUnit.SECONDS); + verifyZeroInteractions(cambriaPublisherMock1); + } +}
\ No newline at end of file diff --git a/src/test/java/org/onap/dcae/vestest/AnyNodeTest.java b/src/test/java/org/onap/dcae/vestest/AnyNodeTest.java index 695f53c9..9400e46d 100644 --- a/src/test/java/org/onap/dcae/vestest/AnyNodeTest.java +++ b/src/test/java/org/onap/dcae/vestest/AnyNodeTest.java @@ -19,89 +19,45 @@ */ package org.onap.dcae.vestest; -import com.google.common.collect.ImmutableMap; +import static org.assertj.core.api.Assertions.assertThat; + import com.google.common.collect.Sets; -import org.json.JSONObject; +import java.util.Set; import org.junit.BeforeClass; import org.junit.Test; import org.onap.dcae.commonFunction.AnyNode; -import java.io.IOException; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.assertj.core.api.Assertions.assertThat; - /** * Created by koblosz on 07.06.18. */ public class AnyNodeTest { - private static final String SAMPLE_JSON_FILEPATH = "src/test/resources/test_anynode_class.json"; - private static final Map<String, Object> EXPECTED_RAW_MAP = ImmutableMap.<String, Object>builder().put("a", 1).put("b", 2).build(); - private static final Set<String> EXPECTED_JSON_KEYS = Sets.newHashSet("channels", "sampleStrList", "sampleNestedObject", "sampleInt", "sampleString", "sampleNull"); + private static final String SAMPLE_JSON_FILEPATH = "{\n" + + " \"channels\": [{\n" + + " \"one\": \"number1\", \"two\": \"number2\", \"three\": \"number3\"}],\n" + + " \"sampleStrList\": [\"1\", \"2\", \"3\", \"4\", \"5\"],\n" + + " \"sampleNestedObject\": {\"a\": 1, \"b\": 2},\n" + + " \"sampleInt\": 1,\n" + + " \"sampleString\": \"str\",\n" + + " \"sampleNull\": null\n" + + "}\n"; + private static final Set<String> EXPECTED_JSON_KEYS = Sets + .newHashSet("channels", "sampleStrList", "sampleNestedObject", "sampleInt", "sampleString", "sampleNull"); private static AnyNode node; @BeforeClass - public static void setUpClass() throws IOException { - node = AnyNode.parse(SAMPLE_JSON_FILEPATH); - } - - @Test(expected = IOException.class) - public void testShouldRethrowExceptionWhenFileNotFound() throws IOException { - AnyNode.parse("not/existing/path"); + public static void setUpClass() { + node = AnyNode.fromString(SAMPLE_JSON_FILEPATH); } @Test public void testShouldReturnJsonObjectKeySet() { - assertThat(node.getKeys()).containsOnlyElementsOf(EXPECTED_JSON_KEYS); - } - - @Test - public void testShouldGetElementAsString() { - assertThat(node.get("sampleStrList").get(0).asString()).isEqualTo("1"); - } - - @Test - public void testShouldGetElementAsInt() { - assertThat(node.get("sampleInt").asInt()).isSameAs(1); - } - - @Test - public void testWhenNullValuePresentShouldReturnJsonObjectNullAsString() { - assertThat(node.get("sampleNull").asString()).isSameAs(JSONObject.NULL.toString()); + assertThat(node.keys()).containsOnlyElementsOf(EXPECTED_JSON_KEYS); } - @Test - public void testShouldGetJsonObjectAsStringToObjectMap() { - assertThat(node.get("sampleNestedObject").asRawMap()).containsAllEntriesOf(EXPECTED_RAW_MAP); - } - - @Test - public void testShouldGetAsMap() { - assertThat(node.asMap().keySet()).containsOnlyElementsOf(EXPECTED_JSON_KEYS); - } - - @Test - public void testShouldGetAsList() { - assertThat(node.get("sampleStrList").asList().stream().map(AnyNode::asString).collect(Collectors.toList())).containsExactly("1", "2", "3", "4", "5"); - } - - @Test - public void testShouldGetAsOptional() { - assertThat(node.getAsOptional("absentKey")).isNotPresent(); - } - - @Test - public void testWhenChainMethodsShouldReturnValue() { - assertThat(node.get("channels").get(0).get("two").asString()).isEqualTo("number2"); - } - - @Test(expected = ClassCastException.class) public void whenInvokedOnJsonObjInsteadOfJsonArrShouldRaiseRuntimeEx() { - node.asList(); + node.toList(); } }
\ No newline at end of file diff --git a/src/test/java/org/onap/dcae/vestest/DmaapPropertyReaderTest.java b/src/test/java/org/onap/dcae/vestest/DmaapPropertyReaderTest.java deleted file mode 100644 index 46f5da4b..00000000 --- a/src/test/java/org/onap/dcae/vestest/DmaapPropertyReaderTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.vestest;
-
-import com.google.common.collect.ImmutableMap;
-import org.junit.Test;
-import org.onap.dcae.commonFunction.DmaapPropertyReader;
-
-import java.util.Map;
-
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-
-public class DmaapPropertyReaderTest {
-
-
- private static final String legacyConfigFilePath = "src/test/resources/testDmaapConfig_ip.json";
- private static final String dmaapInputConfigFilePath = "src/test/resources/testDmaapConfig_gen2.json";
- private static final String fullDmaapConfigWithChannels = "src/test/resources/testFullDmaapConfig_channels.json";
- private static final String fullGen2DmaapConfig = "src/test/resources/testFullDmaapConfig_gen2.json";
-
- private static final String FAULT_UEB_KEY_PREFIX = "sec_fault_ueb";
- private static final String VES_ALERT_SND_KEY_PREFIX = "ves-thresholdCrossingAlert-secondary";
- private static final String VES_FAULT_SECONDARY = "ves-fault-secondary";
-
- private static final String FAULT_BASIC_AUTH_USERNAME_KEY = VES_FAULT_SECONDARY + ".basicAuthUsername";
- private static final String ALERT_BASIC_AUTH_PWD_KEY = VES_ALERT_SND_KEY_PREFIX + ".basicAuthPassword";
-
- private static final String VES_ALERT_CAMBRIA_TOPIC_KEY = VES_ALERT_SND_KEY_PREFIX + ".cambria.topic";
- private static final String VES_ALERT_CAMBRIA_URL_KEY = VES_ALERT_SND_KEY_PREFIX + ".cambria.url";
- private static final String VES_FAULT_SND_CAMBRIA_URL_KEY = VES_FAULT_SECONDARY + ".cambria.url";
- private static final String VES_FAULT_SND_AUTH_PWD_KEY = VES_FAULT_SECONDARY + ".basicAuthPassword";
- private static final String VES_FAULT_SND_CAMBRIA_TOPIC_KEY = VES_FAULT_SECONDARY + ".cambria.topic";
- private static final String FAULT_UEB_CAMBRIA_HOSTS_KEY = FAULT_UEB_KEY_PREFIX + ".cambria.hosts";
- private static final String FAULT_UEB_CAMBRIA_TOPIC_KEY = FAULT_UEB_KEY_PREFIX + ".cambria.topic";
- private static final String VES_ALERT_SND_AUTH_USERNAME_KEY = VES_ALERT_SND_KEY_PREFIX + ".basicAuthUsername";
-
- private static final String NULL_TOSTRING = "null";
-
- private static final Map<String, String> expectedCompleteGen2DmaapConfig = ImmutableMap.<String, String>builder()
- .put(ALERT_BASIC_AUTH_PWD_KEY, "SamplePassWD2")
- .put(VES_ALERT_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV")
- .put(FAULT_BASIC_AUTH_USERNAME_KEY, "sampleUsername")
- .put(VES_ALERT_CAMBRIA_URL_KEY, "UEBHOST:3904")
- .put(VES_FAULT_SND_CAMBRIA_URL_KEY, "UEBHOST:3904")
- .put(VES_FAULT_SND_AUTH_PWD_KEY, "SamplePasswd")
- .put(VES_FAULT_SND_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV")
- .put(VES_ALERT_SND_AUTH_USERNAME_KEY, "sampleUsername2")
- .build();
-
- private static final Map<String, String> expectedIncompleteGen2DmaapConfig = ImmutableMap.<String, String>builder()
- .put(VES_ALERT_SND_AUTH_USERNAME_KEY, NULL_TOSTRING)
- .put(FAULT_BASIC_AUTH_USERNAME_KEY, NULL_TOSTRING)
- .put(VES_ALERT_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV")
- .put(VES_ALERT_CAMBRIA_URL_KEY, "UEBHOST:3904")
- .put(VES_FAULT_SND_CAMBRIA_URL_KEY, "UEBHOST:3904")
- .put(ALERT_BASIC_AUTH_PWD_KEY, NULL_TOSTRING)
- .put(VES_FAULT_SND_AUTH_PWD_KEY, NULL_TOSTRING)
- .put(VES_FAULT_SND_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV")
- .build();
-
- private static final Map<String, String> expectedCompleteChannelsDmaapConfig = ImmutableMap.<String, String>builder()
- .put(FAULT_UEB_CAMBRIA_HOSTS_KEY, "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com")
- .put(FAULT_UEB_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV")
- .put(FAULT_UEB_KEY_PREFIX + ".basicAuthPassword", "S0mEPassWD")
- .put(FAULT_UEB_KEY_PREFIX + ".basicAuthUsername", "sampleUser")
- .put(FAULT_UEB_KEY_PREFIX + ".cambria.url", "127.0.0.1:3904")
- .build();
-
- private static final Map<String, String> expectedIncompleteChannelsDmaapConfig = ImmutableMap.<String, String>builder()
- .put(FAULT_UEB_CAMBRIA_HOSTS_KEY, "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com")
- .put(FAULT_UEB_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV")
- .build();
-
- @Test
- public void testShouldCreateReaderWithAbsentParamsOmittedBasedOnChannelDmaapConfig() {
- assertReaderPreservedAllEntriesAfterTransformation(legacyConfigFilePath, expectedIncompleteChannelsDmaapConfig);
- }
-
- @Test
- public void testShouldCreateReaderWithAbsentParamsOmittedBasedOnGen2DmaapConfig() {
- assertReaderPreservedAllEntriesAfterTransformation(dmaapInputConfigFilePath, expectedIncompleteGen2DmaapConfig);
- }
-
- @Test
- public void shouldCreateReaderWithCompleteChannelDmaapConfig() {
- assertReaderPreservedAllEntriesAfterTransformation(fullDmaapConfigWithChannels, expectedCompleteChannelsDmaapConfig);
- }
-
- @Test
- public void shouldCreateReaderWithCompleteGen2DmaapConfig() {
- assertReaderPreservedAllEntriesAfterTransformation(fullGen2DmaapConfig, expectedCompleteGen2DmaapConfig);
- }
-
- private void assertReaderPreservedAllEntriesAfterTransformation(String dmaapConfigFilePath, Map<String, String> expectedMap) {
- DmaapPropertyReader reader = new DmaapPropertyReader(dmaapConfigFilePath);
-
- assertThat(reader.getDmaapProperties()).containsAllEntriesOf(expectedMap);
- assertThat(expectedMap).containsAllEntriesOf(reader.getDmaapProperties());
- }
-
-}
-
diff --git a/src/test/resources/testDmaapConfig_gen2.json b/src/test/resources/testDmaapConfig_gen2.json deleted file mode 100644 index d8097bf1..00000000 --- a/src/test/resources/testDmaapConfig_gen2.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "ves-fault-secondary": { - "aaf_username": null, - "dmaap_info": { - "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV", - "location": "mtl5", - "client_id": null, - "client_role": null - }, - "type": "message_router", - "aaf_password": null - }, - "ves-thresholdCrossingAlert-secondary": { - "aaf_username": null, - "dmaap_info": { - "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV", - "location": "mtl5", - "client_id": null, - "client_role": null - }, - "type": "message_router", - "aaf_password": null - } - }
\ No newline at end of file diff --git a/src/test/resources/testFullDmaapConfig_channels.json b/src/test/resources/testFullDmaapConfig_channels.json deleted file mode 100644 index e2bc0339..00000000 --- a/src/test/resources/testFullDmaapConfig_channels.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "channels": [ - { - "name": "sec_fault_ueb", - "cambria.url": "127.0.0.1:3904", - "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com", - "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", - "basicAuthPassword": "S0mEPassWD", - "basicAuthUsername": "sampleUser", - "stripHpId": "true" - } - ] -}
\ No newline at end of file diff --git a/src/test/resources/testParseDMaaPCredentialsGen2.json b/src/test/resources/testParseDMaaPCredentialsGen2.json new file mode 100644 index 00000000..953cb6e8 --- /dev/null +++ b/src/test/resources/testParseDMaaPCredentialsGen2.json @@ -0,0 +1,21 @@ +{ + "auth-credentials-null": { + "aaf_username": null, + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV", + }, + "aaf_password": null + }, + "auth-credentials-present": { + "aaf_username": "sampleUser", + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV", + }, + "aaf_password": "samplePassword" + }, + "auth-credentials-missing": { + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV", + } + } +}
\ No newline at end of file diff --git a/src/test/resources/testParseDMaaPCredentialsLegacy.json b/src/test/resources/testParseDMaaPCredentialsLegacy.json new file mode 100644 index 00000000..ca59c7e7 --- /dev/null +++ b/src/test/resources/testParseDMaaPCredentialsLegacy.json @@ -0,0 +1,26 @@ +{ + "channels": [ + { + "name": "auth-credentials-null", + "cambria.url": "127.0.0.1:3904", + "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com", + "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", + "basicAuthPassword": null, + "basicAuthUsername": null, + }, + { + "name": "auth-credentials-present", + "cambria.url": "127.0.0.1:3904", + "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com", + "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", + "basicAuthPassword": "samplePassword", + "basicAuthUsername": "sampleUser", + }, + { + "name": "auth-credentials-missing", + "cambria.url": "127.0.0.1:3904", + "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com", + "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", + } + ] +}
\ No newline at end of file diff --git a/src/test/resources/testParseDMaaPGen2.json b/src/test/resources/testParseDMaaPGen2.json new file mode 100644 index 00000000..5b4fe6a6 --- /dev/null +++ b/src/test/resources/testParseDMaaPGen2.json @@ -0,0 +1,12 @@ +{ + "event-segments-with-port": { + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV", + } + }, + "other-segments-without-ports": { + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/somethingHere/DCAE-SE-COLLECTOR-EVENTS-DEV", + } + } +}
\ No newline at end of file diff --git a/src/test/resources/testParseDMaaPLegacy.json b/src/test/resources/testParseDMaaPLegacy.json new file mode 100644 index 00000000..9661e30c --- /dev/null +++ b/src/test/resources/testParseDMaaPLegacy.json @@ -0,0 +1,21 @@ +{ + "channels": [ + { + "name": "url-precedes-hosts", + "cambria.url": "127.0.0.1:3904", + "cambria.hosts": "h1.att.com,h2.att.com", + "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", + }, + { + "name": "url-key-missing", + "cambria.hosts": "h1.att.com,h2.att.com", + "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", + }, + { + "name": "url-is-null", + "cambria.url": null, + "cambria.hosts": "h1.att.com,h2.att.com", + "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV" + } + ] +}
\ No newline at end of file |