From cd66181b35300f020f197bb411d6bdf6ad2514fb Mon Sep 17 00:00:00 2001 From: PawelSzalapski Date: Tue, 26 Jun 2018 15:16:41 +0200 Subject: Prepare codebase for dynamic DMaaP configuration From now on, there is only one single place where we can create whole app core concerning sending events and it has a single entry point, based on DMaaP configuration. It can be used to rebuild part of app that is responsible for sending events dynamically. Changes are in scope for the dynamic DMaaP config feature. + bumped up code coverage a bit Change-Id: Iecc8c4e534ae9b781f47e3616409271ba83169c8 Signed-off-by: PawelSzalapski Issue-ID: DCAEGEN2-517 --- pom.xml | 6 + .../java/org/onap/dcae/commonFunction/AnyNode.java | 157 +++------ .../commonFunction/CambriaPublisherFactory.java | 64 ---- .../onap/dcae/commonFunction/CommonStartup.java | 373 ++++++++++----------- .../dcae/commonFunction/DmaapPropertyReader.java | 143 -------- .../onap/dcae/commonFunction/DmaapPublishers.java | 91 ----- .../onap/dcae/commonFunction/EventProcessor.java | 7 +- .../dcae/commonFunction/EventPublisherHash.java | 85 ----- .../event/publishing/DMaaPConfigurationParser.java | 108 ++++++ .../event/publishing/DMaaPEventPublisher.java | 99 ++++++ .../event/publishing/DMaaPPublishersBuilder.java | 62 ++++ .../event/publishing/DMaaPPublishersCache.java | 124 +++++++ .../event/publishing/EventPublisher.java | 38 +++ .../event/publishing/PublisherConfig.java | 98 ++++++ .../commonFunction/event/publishing/VavrUtils.java | 51 +++ .../dcae/commonFunction/DmaapPublishersTest.java | 144 -------- .../dcae/commonFunction/EventProcessorTest.java | 35 +- .../commonFunction/EventPublisherHashTest.java | 90 ----- .../dcae/commonFunction/TestCommonStartup.java | 4 +- .../publishing/DMaaPConfigurationParserTest.java | 114 +++++++ .../event/publishing/DMaaPEventPublisherTest.java | 89 +++++ .../event/publishing/DMaaPPublishersCacheTest.java | 126 +++++++ .../java/org/onap/dcae/vestest/AnyNodeTest.java | 80 +---- .../onap/dcae/vestest/DmaapPropertyReaderTest.java | 120 ------- src/test/resources/testDmaapConfig_gen2.json | 24 -- .../resources/testFullDmaapConfig_channels.json | 13 - .../resources/testParseDMaaPCredentialsGen2.json | 21 ++ .../resources/testParseDMaaPCredentialsLegacy.json | 26 ++ src/test/resources/testParseDMaaPGen2.json | 12 + src/test/resources/testParseDMaaPLegacy.json | 21 ++ 30 files changed, 1255 insertions(+), 1170 deletions(-) delete mode 100644 src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java delete mode 100644 src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java create mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java create mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java create mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java create mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java create mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java create mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java create mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java delete mode 100644 src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java delete mode 100644 src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java create mode 100644 src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java create mode 100644 src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java create mode 100644 src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java delete mode 100644 src/test/java/org/onap/dcae/vestest/DmaapPropertyReaderTest.java delete mode 100644 src/test/resources/testDmaapConfig_gen2.json delete mode 100644 src/test/resources/testFullDmaapConfig_channels.json create mode 100644 src/test/resources/testParseDMaaPCredentialsGen2.json create mode 100644 src/test/resources/testParseDMaaPCredentialsLegacy.json create mode 100644 src/test/resources/testParseDMaaPGen2.json create mode 100644 src/test/resources/testParseDMaaPLegacy.json diff --git a/pom.xml b/pom.xml index b2e9b856..b534379f 100644 --- a/pom.xml +++ b/pom.xml @@ -386,6 +386,12 @@ limitations under the License. mail 1.4.7 + + io.vavr + vavr + 0.9.2 + + diff --git a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java index 267c87a9..97d73ddd 100644 --- a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java +++ b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java @@ -19,164 +19,95 @@ */ package org.onap.dcae.commonFunction; +import static io.vavr.API.Set; + +import io.vavr.collection.List; +import io.vavr.collection.Set; +import io.vavr.control.Option; +import java.util.stream.StreamSupport; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; -import org.json.JSONTokener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; /** - * This class is a wrapper for 2 most used entities of org.json lib: JSONArray and JSONObject and - * comprises utility methods for fast access of json structures without need to explicitly coerce between them. - * While using this, bear in mind it does not contain exception handling - it is assumed that when using, the parsed json structure is known. + * This class is a wrapper for 2 most used entities of org.json lib: JSONArray and JSONObject and comprises utility + * methods for fast access of json structures without need to explicitly coerce between them. While using this, bear in + * mind it does not contain exception handling - it is assumed that when using, the parsed json structure is known. * * @author koblosz */ public class AnyNode { - private final Object obj; - private static final Logger log = LoggerFactory.getLogger(AnyNode.class); - public static AnyNode parse(String filePath) throws IOException { - try (FileReader fr = new FileReader(filePath)) { - return new AnyNode(new JSONObject(new JSONTokener(fr))); - } catch (FileNotFoundException | JSONException e1) { - log.error("Could not find or parse file under path %s due to: %s", filePath, e1.toString()); - e1.printStackTrace(); - throw e1; - } - } + private Object obj; - /** - * Returns key set of underlying object. It is assumed that underlying object is of type org.json.JSONObject. - * - * @return Set of string keys present in underlying JSONObject - */ - public Set getKeys() { - return asJsonObject().keySet(); + private AnyNode(Object object) { + this.obj = object; } - /** - * Returns value associated with specified key wrapped with AnyValue object. It is assumed that this is of type org.json.JSONObject. - * - * @param key A key string - * @return The AnyNode object associated with given key. - */ - public AnyNode get(String key) { - return new AnyNode(asJsonObject().get(key)); + public static AnyNode fromString(String content) { + return new AnyNode(new JSONObject(content)); } /** - * Returns value under specified index wrapped with AnyValue object. It is assumed that this is of type org.json.JSONArray. - * - * @param idx An index of JSONArray - * @return The AnyNode object associated with given index. + * Returns key set of underlying object. It is assumed that underlying object is of type org.json.JSONObject. */ - public AnyNode get(int idx) { - return new AnyNode(asJsonArray().get(idx)); + public Set keys() { + return Set(asJsonObject().keySet().toArray(new String[]{})); } /** - * Returns int assuming this can be coerced to int. + * Returns value associated with specified key wrapped with AnyValue object. It is assumed that this is of type + * org.json.JSONObject. */ - public int asInt() { - return (int) this.obj; + public AnyNode get(String key) { + return new AnyNode(asJsonObject().get(key)); } /** - * Returns string representation of this. If it happens to have null, the value is treated as org.json.JSONObject.NULL and "null" string is returned then. - * - * @return A String + * Returns string representation of this. If it happens to have null, the value is treated as + * org.json.JSONObject.NULL and "null" string is returned then. */ - public String asString() { - return this.obj != JSONObject.NULL ? (String) this.obj : JSONObject.NULL.toString(); - } - public String toString() { return this.obj.toString(); } /** - * Converts underlying object to String-to-Object map. It is assumed that underlying object is of type org.json.JSONObject. - * - * @return A map. + * Returns optional of object under specified key, wrapped with AnyNode object. + * If underlying object is not of type org.json.JSONObject + * or underlying object has no given key + * or given key is null + * then Optional.empty will be returned. */ - public Map asRawMap() { - return asJsonObject().toMap(); - } - - /** - * Returns optional of object under specified key, wrapped with AnyNode object. If underlying object is not of type org.json.JSONObject, then Optional.empty will be returned. - * - * @param key A key string - */ - public Optional getAsOptional(String key) { - AnyNode result = null; + public Option getAsOption(String key) { try { - result = get(key); - } catch (JSONException ignored) { + AnyNode value = get(key); + if (value.toString().equals("null")) { + return Option.none(); + } + return Option.some(value); + } catch (JSONException ex) { + return Option.none(); } - return Optional.ofNullable(result); - } - - private JSONObject asJsonObject() { - return (JSONObject) this.obj; - } - - /** - * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that underlying object is of type org.json.JSONObject. - */ - public Map asMap() { - Map map = new HashMap<>(); - getKeys().forEach(key -> map.put(key, get(key))); - return map; - } - - /** - * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that underlying object is of type org.json.JSONObject. - */ - public java.util.List asList() { - return asStream().collect(Collectors.toList()); } /** - * Converts this object to stream of underlying objects wrapped with AnyNode class. It is assumed that this is of type JSONArray. + * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that + * underlying object is of type org.json.JSONObject. */ - private Stream asStream() { - return StreamSupport.stream(((JSONArray) this.obj).spliterator(), false).map(AnyNode::new); + public List toList() { + return List.ofAll(StreamSupport.stream(((JSONArray) this.obj).spliterator(), false).map(AnyNode::new)); } /** * Checks if specified key is present in this. It is assumed that this is of type JSONObject. */ - boolean hasKey(String key) { - return getAsOptional(key).isPresent(); - } - - /** - * Returns empty AnyNode (with null inside) - */ - public static AnyNode nullValue() { - return new AnyNode(JSONObject.NULL.toString()); + public boolean has(String key) { + return !getAsOption(key).isEmpty(); } - private JSONArray asJsonArray() { - return (JSONArray) this.obj; + private JSONObject asJsonObject() { + return (JSONObject) this.obj; } - private AnyNode(Object object) { - this.obj = object; - } } diff --git a/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java b/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java deleted file mode 100644 index 79109c04..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.onap.dcae.commonFunction; -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class CambriaPublisherFactory { - - private final static Logger log = LoggerFactory.getLogger(CambriaPublisherFactory.class); - - CambriaBatchingPublisher createCambriaPublisher(String streamId) - throws MalformedURLException, GeneralSecurityException { - String authpwd = null; - DmaapPropertyReader reader = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile); - Map dMaaPProperties = reader.getDmaapProperties(); - String ueburl = dMaaPProperties.get(streamId + ".cambria.url"); - - if (ueburl == null) { - ueburl = dMaaPProperties.get(streamId + ".cambria.hosts"); - } - String topic = reader.getKeyValue(streamId + ".cambria.topic"); - String authuser = reader.getKeyValue(streamId + ".basicAuthUsername"); - - if (authuser != null) { - authpwd = dMaaPProperties.get(streamId + ".basicAuthPassword"); - } - - if ((authuser != null) && (authpwd != null)) { - log.debug(String.format("URL:%sTOPIC:%sAuthUser:%sAuthpwd:%s", ueburl, topic, authuser, authpwd)); - return new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic).usingHttps() - .authenticatedByHttp(authuser, authpwd).logSendFailuresAfter(5) - .build(); - } else { - log.debug(String.format("URL:%sTOPIC:%s", ueburl, topic)); - return new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic) - .logSendFailuresAfter(5) - .build(); - } - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java index b60104ea..3469531e 100644 --- a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java +++ b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -39,15 +39,6 @@ import com.github.fge.jsonschema.core.report.ProcessingMessage; import com.github.fge.jsonschema.core.report.ProcessingReport; import com.github.fge.jsonschema.main.JsonSchema; import com.github.fge.jsonschema.main.JsonSchemaFactory; - -import org.apache.catalina.LifecycleException; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; -import org.onap.dcae.restapi.RestfulCollectorServlet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.net.URL; import java.nio.file.Files; @@ -58,185 +49,193 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.catalina.LifecycleException; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.onap.dcae.commonFunction.event.publishing.DMaaPConfigurationParser; +import org.onap.dcae.commonFunction.event.publishing.EventPublisher; +import org.onap.dcae.restapi.RestfulCollectorServlet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CommonStartup extends NsaBaseEndpoint implements Runnable { - private static final String KCONFIG = "c"; - private static final String KSETTING_PORT = "collector.service.port"; - private static final int KDEFAULT_PORT = 8080; - private static final String KSETTING_SECUREPORT = "collector.service.secure.port"; - private static final int KDEFAULT_SECUREPORT = -1; - private static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile"; - private static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile"; - private static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location"; - private static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore"; - private static final String KSETTING_KEYALIAS = "collector.keystore.alias"; - private static final String KDEFAULT_KEYALIAS = "tomcat"; - private static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile"; - private static final String[] KDEFAULT_DMAAPCONFIGS = new String[] { "/etc/DmaapConfig.json" }; - private static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag"; - private static final int KDEFAULT_SCHEMAVALIDATOR = -1; - private static final String KSETTING_SCHEMAFILE = "collector.schema.file"; - private static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}"; - private static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid"; - private static final String KSETTING_AUTHFLAG = "header.authflag"; - private static final int KDEFAULT_AUTHFLAG = 0; - private static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag"; - private static final int KDEFAULT_EVENTTRANSFORMFLAG = 1; - private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); - public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input"); - static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output"); - public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error"); - - public static final String KSETTING_AUTHLIST = "header.authlist"; - static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4; - public static int schemaValidatorflag = -1; - public static int authflag = 1; - static int eventTransformFlag = 1; - public static JSONObject schemaFileJson; - static String cambriaConfigFile; - public static String streamID; - - static LinkedBlockingQueue fProcessingInputQueue; - private static ApiServer fTomcatServer = null; - private static final Logger log = LoggerFactory.getLogger(CommonStartup.class); - - private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting { - final List connectors = new LinkedList<>(); - - if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) { - connectors.add(new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)).secure(false) - .build()); - } - - final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT); - final String keystoreFile = settings.getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE); - final String keystorePasswordFile = settings.getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE); - final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS); - - if (securePort > 0) { - String keystorePassword = readFile(keystorePasswordFile); - connectors.add(new ApiServerConnector.Builder(securePort).secure(true) - .keystorePassword(keystorePassword).keystoreFile(keystoreFile).keyAlias(keyAlias).build()); - - } - - schemaValidatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR); - if (schemaValidatorflag > 0) { - String schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE); - schemaFileJson = new JSONObject(schemaFile); - - } - authflag = settings.getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG); - String[] currentConfigFile = settings.getStrings(KSETTING_DMAAPCONFIGS, KDEFAULT_DMAAPCONFIGS); - cambriaConfigFile = currentConfigFile[0]; - streamID = settings.getString(KSETTING_DMAAPSTREAMID, null); - eventTransformFlag = settings.getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG); - - fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true) - .name("collector").build(); - } - - public static void main(String[] args) { - try { - final Map argMap = NsaCommandLineUtil.processCmdLine(args, true); - final String config = NsaCommandLineUtil.getSetting(argMap, KCONFIG, "collector.properties"); - final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class); - - final nvReadableStack settings = new nvReadableStack(); - settings.push(new nvPropertiesFile(settingStream)); - settings.push(new nvReadableTable(argMap)); - - fProcessingInputQueue = new LinkedBlockingQueue<>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS); - - VESLogger.setUpEcompLogging(); - - CommonStartup cs = new CommonStartup(settings); - - Thread commonStartupThread = new Thread(cs); - commonStartupThread.start(); - - EventProcessor ep = new EventProcessor(); - ExecutorService executor = Executors.newFixedThreadPool(20); - for (int i = 0; i < 20; ++i) { - executor.execute(ep); - } - - } catch (loadException | missingReqdSetting | IOException e) { - CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage()); - throw new RuntimeException(e); - } catch (Exception e) { - System.err.println("Uncaught exception - " + e.getMessage()); - CommonStartup.eplog.error("FATAL_ERROR" + e.getMessage()); - e.printStackTrace(System.err); - } - } - - public void run() { - try { - fTomcatServer.start(); - fTomcatServer.await(); - } catch (LifecycleException | IOException e) { - throw new RuntimeException(e); - } - } - - public static class QueueFullException extends Exception { - private static final long serialVersionUID = 1L; - } - - public static void handleEvents(JSONArray a) throws QueueFullException, JSONException { - CommonStartup.metriclog.info("EVENT_PUBLISH_START"); - for (int i = 0; i < a.length(); i++) { - if (!fProcessingInputQueue.offer(a.getJSONObject(i))) { - throw new QueueFullException(); - } - } - log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); - CommonStartup.metriclog.info("EVENT_PUBLISH_END"); - } - - private static String readFile(String path) throws IOException { - byte[] encoded = Files.readAllBytes(Paths.get(path)); - String pwd = new String(encoded); - return pwd.substring(0, pwd.length() - 1); - } - - public static String validateAgainstSchema(String jsonData, String jsonSchema) { - ProcessingReport report; - String result = "false"; - - try { - log.trace("Schema validation for event:" + jsonData); - JsonNode schemaNode = JsonLoader.fromString(jsonSchema); - JsonNode data = JsonLoader.fromString(jsonData); - JsonSchemaFactory factory = JsonSchemaFactory.byDefault(); - JsonSchema schema = factory.getJsonSchema(schemaNode); - report = schema.validate(data); - } catch (JsonParseException e) { - log.error("validateAgainstSchema:JsonParseException for event:" + jsonData); - return e.getMessage(); - } catch (ProcessingException e) { - log.error("validateAgainstSchema:Processing exception for event:" + jsonData); - return e.getMessage(); - } catch (IOException e) { - log.error( - "validateAgainstSchema:IO exception; something went wrong trying to read json data for event:" + jsonData); - return e.getMessage(); - } - if (report != null) { - for (ProcessingMessage pm : report) { - log.trace("Processing Message: " + pm.getMessage()); - } - result = String.valueOf(report.isSuccess()); - } - try { - log.debug("Validation Result:" + result + " Validation report:" + report); - } catch (NullPointerException e) { - log.error("validateAgainstSchema:NullpointerException on report"); - } - return result; - } + private static final String KCONFIG = "c"; + private static final String KSETTING_PORT = "collector.service.port"; + private static final int KDEFAULT_PORT = 8080; + private static final String KSETTING_SECUREPORT = "collector.service.secure.port"; + private static final int KDEFAULT_SECUREPORT = -1; + private static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile"; + private static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile"; + private static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location"; + private static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore"; + private static final String KSETTING_KEYALIAS = "collector.keystore.alias"; + private static final String KDEFAULT_KEYALIAS = "tomcat"; + private static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile"; + private static final String[] KDEFAULT_DMAAPCONFIGS = new String[]{"/etc/DmaapConfig.json"}; + private static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag"; + private static final int KDEFAULT_SCHEMAVALIDATOR = -1; + private static final String KSETTING_SCHEMAFILE = "collector.schema.file"; + private static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}"; + private static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid"; + private static final String KSETTING_AUTHFLAG = "header.authflag"; + private static final int KDEFAULT_AUTHFLAG = 0; + private static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag"; + private static final int KDEFAULT_EVENTTRANSFORMFLAG = 1; + private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); + public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input"); + static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output"); + public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error"); + + public static final String KSETTING_AUTHLIST = "header.authlist"; + static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4; + public static int schemaValidatorflag = -1; + public static int authflag = 1; + static int eventTransformFlag = 1; + public static JSONObject schemaFileJson; + static String cambriaConfigFile; + public static String streamID; + + static LinkedBlockingQueue fProcessingInputQueue; + private static ApiServer fTomcatServer = null; + private static final Logger log = LoggerFactory.getLogger(CommonStartup.class); + + private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting { + final List connectors = new LinkedList<>(); + + if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) { + connectors.add(new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)).secure(false) + .build()); + } + + final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT); + final String keystoreFile = settings.getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE); + final String keystorePasswordFile = settings.getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE); + final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS); + + if (securePort > 0) { + String keystorePassword = readFile(keystorePasswordFile); + connectors.add(new ApiServerConnector.Builder(securePort).secure(true) + .keystorePassword(keystorePassword).keystoreFile(keystoreFile).keyAlias(keyAlias).build()); + + } + + schemaValidatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR); + if (schemaValidatorflag > 0) { + String schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE); + schemaFileJson = new JSONObject(schemaFile); + + } + authflag = settings.getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG); + String[] currentConfigFile = settings.getStrings(KSETTING_DMAAPCONFIGS, KDEFAULT_DMAAPCONFIGS); + cambriaConfigFile = currentConfigFile[0]; + streamID = settings.getString(KSETTING_DMAAPSTREAMID, null); + eventTransformFlag = settings.getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG); + + fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true) + .name("collector").build(); + } + + public static void main(String[] args) { + try { + final Map argMap = NsaCommandLineUtil.processCmdLine(args, true); + final String config = NsaCommandLineUtil.getSetting(argMap, KCONFIG, "collector.properties"); + final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class); + + final nvReadableStack settings = new nvReadableStack(); + settings.push(new nvPropertiesFile(settingStream)); + settings.push(new nvReadableTable(argMap)); + + fProcessingInputQueue = new LinkedBlockingQueue<>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS); + + VESLogger.setUpEcompLogging(); + + CommonStartup cs = new CommonStartup(settings); + + Thread commonStartupThread = new Thread(cs); + commonStartupThread.start(); + + EventProcessor ep = new EventProcessor(EventPublisher.createPublisher(oplog, + DMaaPConfigurationParser + .parseToDomainMapping(Paths.get(cambriaConfigFile)) + .get())); + ExecutorService executor = Executors.newFixedThreadPool(20); + for (int i = 0; i < 20; ++i) { + executor.execute(ep); + } + } catch (Exception e) { + CommonStartup.eplog.error("Fatal error during application startup", e); + throw new RuntimeException(e); + } + } + + public void run() { + try { + fTomcatServer.start(); + fTomcatServer.await(); + } catch (LifecycleException | IOException e) { + throw new RuntimeException(e); + } + } + + public static class QueueFullException extends Exception { + + private static final long serialVersionUID = 1L; + } + + public static void handleEvents(JSONArray a) throws QueueFullException, JSONException { + CommonStartup.metriclog.info("EVENT_PUBLISH_START"); + for (int i = 0; i < a.length(); i++) { + if (!fProcessingInputQueue.offer(a.getJSONObject(i))) { + throw new QueueFullException(); + } + } + log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); + CommonStartup.metriclog.info("EVENT_PUBLISH_END"); + } + + private static String readFile(String path) throws IOException { + byte[] encoded = Files.readAllBytes(Paths.get(path)); + String pwd = new String(encoded); + return pwd.substring(0, pwd.length() - 1); + } + + public static String validateAgainstSchema(String jsonData, String jsonSchema) { + ProcessingReport report; + String result = "false"; + + try { + log.trace("Schema validation for event:" + jsonData); + JsonNode schemaNode = JsonLoader.fromString(jsonSchema); + JsonNode data = JsonLoader.fromString(jsonData); + JsonSchemaFactory factory = JsonSchemaFactory.byDefault(); + JsonSchema schema = factory.getJsonSchema(schemaNode); + report = schema.validate(data); + } catch (JsonParseException e) { + log.error("validateAgainstSchema:JsonParseException for event:" + jsonData); + return e.getMessage(); + } catch (ProcessingException e) { + log.error("validateAgainstSchema:Processing exception for event:" + jsonData); + return e.getMessage(); + } catch (IOException e) { + log.error( + "validateAgainstSchema:IO exception; something went wrong trying to read json data for event:" + jsonData); + return e.getMessage(); + } + if (report != null) { + for (ProcessingMessage pm : report) { + log.trace("Processing Message: " + pm.getMessage()); + } + result = String.valueOf(report.isSuccess()); + } + try { + log.debug("Validation Result:" + result + " Validation report:" + report); + } catch (NullPointerException e) { + log.error("validateAgainstSchema:NullpointerException on report"); + } + return result; + } } diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java deleted file mode 100644 index 0ee1e434..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java +++ /dev/null @@ -1,143 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.commonFunction; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -public class DmaapPropertyReader { - - - private static final Logger log = LoggerFactory.getLogger(DmaapPropertyReader.class); - private static final String CAMBRIA_TOPIC_KEY = "cambria.topic"; - private static final String CAMBRIA_HOSTS_KEY = "cambria.hosts"; - private static final String CAMBRIA_URL_KEY = "cambria.url"; - private static final List LEGACY_CHANNEL_MANDATORY_PARAMS = Lists.newArrayList(CAMBRIA_TOPIC_KEY, CAMBRIA_HOSTS_KEY, CAMBRIA_URL_KEY, "basicAuthPassword", "basicAuthUsername"); - private static DmaapPropertyReader instance = null; - private final Map dmaapProperties; - - public DmaapPropertyReader(String cambriaConfigFilePath) { - this.dmaapProperties = DmaapPropertyReader.getProcessedDmaapProperties(cambriaConfigFilePath); - } - - public Map getDmaapProperties() { - return dmaapProperties; - } - - public static synchronized DmaapPropertyReader getInstance(String channelConfig) { - if (instance == null) { - instance = new DmaapPropertyReader(channelConfig); - } - return instance; - } - - public String getKeyValue(String hashKey) { - return dmaapProperties.get(hashKey); - } - - private static Map getProcessedDmaapProperties(String configFilePath) { - Map transformedDmaapProperties = new HashMap<>(); - try { - AnyNode root = AnyNode.parse(configFilePath); - if (isInLegacyFormat(root)) { - transformedDmaapProperties = getLegacyDmaapPropertiesWithChannels(root.get("channels")); - } else { - transformedDmaapProperties = getDmaapPropertiesWithInfoData(root); - } - } catch (IOException e) { - e.printStackTrace(); - } - return transformedDmaapProperties; - } - - private static boolean isInLegacyFormat(AnyNode root) { - return root.hasKey("channels"); - } - - private static Map getLegacyDmaapPropertiesWithChannels(AnyNode channelsNode) { - return channelsNode.asList().stream() - .map(DmaapPropertyReader::getTransformedMandatoryChannelProperties) - .flatMap(m -> m.entrySet().stream()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - - private static Map getTransformedMandatoryChannelProperties(AnyNode channel) { - String prefix = channel.get("name").asString() + "."; - return channel.asMap().entrySet().stream().filter(el -> LEGACY_CHANNEL_MANDATORY_PARAMS.contains(el.getKey()) && !Objects.equals(el.getKey(), "name")) - .collect(Collectors.toMap(k -> prefix + k.getKey(), v -> v.getValue().asString().replace("\"", ""))); - } - - private static Map getDmaapPropertiesWithInfoData(AnyNode root) { - return root.asMap().entrySet().stream() - .map(DmaapPropertyReader::getTransformedMandatoryInfoProperties).flatMap(m -> m.entrySet().stream()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - - private static Map getTransformedMandatoryInfoProperties(Map.Entry el) { - String prefix = el.getKey() + "."; - AnyNode val = el.getValue(); - Map map = Maps.newHashMap(); - map.put(prefix + "basicAuthUsername", val.getAsOptional("aaf_username").orElse(AnyNode.nullValue()).asString().replace("\"", "")); - map.put(prefix + "basicAuthPassword", val.getAsOptional("aaf_password").orElse(AnyNode.nullValue()).asString().replace("\"", "")); - map.putAll(getParamsFromDmaapInfoTopicUrl(prefix, val.get("dmaap_info").get("topic_url").asString().replace("\"", ""))); - return map; - } - - /*** - * Dmaap url structure pub - https://:/events/ - * .., sub - https://: - * /events/../G1/u1"; - * - * Onap url structure pub - http://:/. - * , - */ - private static Map getParamsFromDmaapInfoTopicUrl(String keyPrefix, String topicUrl) { - Map topicUrlParts = Maps.newHashMap(); - try { - URL url = new URL(topicUrl); - topicUrlParts.put(keyPrefix + CAMBRIA_URL_KEY, url.getAuthority()); - String[] pathParts = url.getPath().split("/"); - if (pathParts.length > 2 && "events".equals(pathParts[1])) { - // DCAE internal dmaap topic convention - topicUrlParts.put(keyPrefix + CAMBRIA_TOPIC_KEY, pathParts[2]); - } else { - // ONAP dmaap topic convention - topicUrlParts.put(keyPrefix + CAMBRIA_TOPIC_KEY, pathParts[2]); - topicUrlParts.put(keyPrefix + CAMBRIA_HOSTS_KEY, url.getHost()); - } - } catch (MalformedURLException e) { - log.error("Invalid URL found under topic_url key!", e); - e.printStackTrace(); - } - return topicUrlParts; - } -} \ No newline at end of file diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java deleted file mode 100644 index a4c62719..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java +++ /dev/null @@ -1,91 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class DmaapPublishers { - - private static final Logger log = LoggerFactory.getLogger(DmaapPublishers.class); - private final LoadingCache publishers; - - private DmaapPublishers( - LoadingCache publishers) { - this.publishers = publishers; - } - - static DmaapPublishers create() { - return create(new CambriaPublisherFactory()); - } - - static DmaapPublishers create(final CambriaPublisherFactory publisherFactory) { - final LoadingCache cache = CacheBuilder.newBuilder() - .removalListener((RemovalListener) notification -> { - if (notification.getValue() != null) { - onCacheItemInvalidated(notification.getValue()); - } - }) - .build(new CacheLoader() { - @Override - public CambriaBatchingPublisher load(String streamId) - throws MalformedURLException, GeneralSecurityException { - try { - return publisherFactory.createCambriaPublisher(streamId); - } catch (MalformedURLException | GeneralSecurityException e) { - log.error("CambriaClientBuilders connection reader exception : streamID - " + streamId + " " - + e.getMessage()); - throw e; - } - } - }); - return new DmaapPublishers(cache); - } - - CambriaBatchingPublisher getByStreamId(String streamId) { - return publishers.getUnchecked(streamId); - } - - void closeByStreamId(String streamId) { - publishers.invalidate(streamId); - } - - private static void onCacheItemInvalidated(CambriaBatchingPublisher pub) { - try { - final List stuck = pub.close(20, TimeUnit.SECONDS); - if (!stuck.isEmpty()) { - log.error(stuck.size() + " messages unsent"); - } - } catch (InterruptedException | IOException e) { - log.error("Caught Exception on Close event: {}", e); - } - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java index 06a27328..9d6ad360 100644 --- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java +++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java @@ -27,6 +27,7 @@ import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import java.util.Map; import org.json.JSONObject; +import org.onap.dcae.commonFunction.event.publishing.EventPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,8 +53,10 @@ class EventProcessor implements Runnable { static Map streamidHash = new HashMap<>(); public JSONObject event; + private EventPublisher eventPublisher; - EventProcessor() { + public EventProcessor(EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; streamidHash = parseStreamIdToStreamHashMapping(CommonStartup.streamID); } @@ -128,7 +131,7 @@ class EventProcessor implements Runnable { for (String aStreamIdList : streamIdList) { log.info("Invoking publisher for streamId:" + aStreamIdList); this.overrideEvent(); - EventPublisherHash.getInstance().sendEvent(event, aStreamIdList); + eventPublisher.sendEvent(event, aStreamIdList); } } diff --git a/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java b/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java deleted file mode 100644 index f3907126..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java +++ /dev/null @@ -1,85 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.commonFunction; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; -import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EventPublisherHash { - - private static final String VES_UNIQUE_ID = "VESuniqueId"; - private static final Logger log = LoggerFactory.getLogger(EventPublisherHash.class); - private static volatile EventPublisherHash instance = new EventPublisherHash(DmaapPublishers.create()); - private final DmaapPublishers dmaapPublishers; - - public static EventPublisherHash getInstance() { - return instance; - } - - @VisibleForTesting - EventPublisherHash(DmaapPublishers dmaapPublishers) { - this.dmaapPublishers = dmaapPublishers; - } - - void sendEvent(JSONObject event, String streamid) { - log.debug("EventPublisher.sendEvent: instance for publish is ready"); - clearVesUniqueId(event); - - try { - sendEventUsingCachedPublisher(streamid, event); - } catch (IOException | IllegalArgumentException e) { - log.error("Unable to publish event: {} streamID: {}. Exception: {}", event, streamid, e); - dmaapPublishers.closeByStreamId(streamid); - } - } - - private void clearVesUniqueId(JSONObject event) { - if (event.has(VES_UNIQUE_ID)) { - String uuid = event.get(VES_UNIQUE_ID).toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - log.debug("Removing VESuniqueid object from event"); - event.remove(VES_UNIQUE_ID); - } - } - - private void sendEventUsingCachedPublisher(String streamid, JSONObject event) throws IOException { - int pendingMsgs = dmaapPublishers.getByStreamId(streamid).send("MyPartitionKey", event.toString()); - if (pendingMsgs > 100) { - log.info("Pending Message Count=" + pendingMsgs); - } - log.info("pub.send invoked - no error"); - CommonStartup.oplog.info(String.format("StreamID:%s Event Published:%s ", streamid, event)); - } - - @VisibleForTesting - CambriaBatchingPublisher getDmaapPublisher(String streamId) { - return dmaapPublishers.getByStreamId(streamId); - } -} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java new file mode 100644 index 00000000..bef14448 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java @@ -0,0 +1,108 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.List; +import static io.vavr.API.Try; +import static io.vavr.API.Tuple; +import static io.vavr.API.unchecked; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; + +import io.vavr.collection.List; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import io.vavr.control.Try; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import org.onap.dcae.commonFunction.AnyNode; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +@SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do") +public final class DMaaPConfigurationParser { + + public static Try> parseToDomainMapping(Path configLocation) { + return readFromFile(configLocation) + .flatMap(DMaaPConfigurationParser::toJSON) + .flatMap(DMaaPConfigurationParser::toConfigMap); + } + + private static Try readFromFile(Path configLocation) { + return Try(() -> new String(Files.readAllBytes(configLocation))) + .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation))); + } + + private static Try toJSON(String config) { + return Try(() -> AnyNode.fromString(config)) + .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config))); + } + + private static Try> toConfigMap(AnyNode config) { + return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config)) + .mapFailure(enhanceError( + f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config))); + } + + private static boolean usesLegacyFormat(AnyNode dMaaPConfig) { + return dMaaPConfig.has("channels"); + } + + private static Map parseLegacyFormat(AnyNode root) { + return root.get("channels").toList().toMap( + channel -> channel.get("name").toString(), + channel -> { + String destinationsStr = channel.getAsOption("cambria.url") + .getOrElse(channel.getAsOption("cambria.hosts").get()) + .toString(); + String topic = channel.get("cambria.topic").toString(); + Option maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString); + Option maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString); + List destinations = List(destinationsStr.split(",")); + return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); + }); + } + + private static Map parseNewFormat(AnyNode root) { + return root.keys().toMap( + channelName -> channelName, + channelName -> { + AnyNode channelConfig = root.get(channelName); + Option maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString); + Option maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString); + URL topicURL = unchecked( + () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply(); + String[] pathSegments = topicURL.getPath().substring(1).split("/"); + String topic = pathSegments[1]; + String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost(); + List destinations = List(destination); + return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); + }); + } + + private static PublisherConfig buildBasedOnAuth(Option maybeUser, Option maybePassword, + String topic, List destinations) { + return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password))) + .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2)) + .getOrElse(new PublisherConfig(destinations, topic)); + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java new file mode 100644 index 00000000..fd9b3ae1 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java @@ -0,0 +1,99 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.commonFunction.event.publishing; + +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.clock.SaClock; +import com.att.nsa.logging.LoggingContext; +import com.att.nsa.logging.log4j.EcompFields; +import io.vavr.collection.Map; +import io.vavr.control.Try; +import java.io.IOException; +import org.json.JSONObject; +import org.onap.dcae.commonFunction.VESLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +class DMaaPEventPublisher implements EventPublisher { + private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; + private static final String VES_UNIQUE_ID = "VESuniqueId"; + private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); + private final DMaaPPublishersCache publishersCache; + private final Logger outputLogger; + + DMaaPEventPublisher(DMaaPPublishersCache DMaaPPublishersCache, + Logger outputLogger) { + this.publishersCache = DMaaPPublishersCache; + this.outputLogger = outputLogger; + } + + @Override + public void sendEvent(JSONObject event, String domain) { + clearVesUniqueIdFromEvent(event); + publishersCache.getPublisher(domain) + .onEmpty(() -> + log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event))) + .forEach(publisher -> sendEvent(event, domain, publisher)); + } + + @Override + public void reconfigure(Map dMaaPConfig) { + publishersCache.reconfigure(dMaaPConfig); + } + + private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) { + Try.run(() -> uncheckedSendEvent(event, domain, publisher)) + .onFailure(exc -> closePublisher(event, domain, exc)); + } + + private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) + throws IOException { + int pendingMsgs = publisher.send("MyPartitionKey", event.toString()); + if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { + log.info("Pending messages count: " + pendingMsgs); + } + String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain); + log.info(infoMsg); + outputLogger.info(infoMsg); + } + + private void closePublisher(JSONObject event, String domain, Throwable e) { + log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", + event, domain), e); + publishersCache.closePublisherFor(domain); + } + + private void clearVesUniqueIdFromEvent(JSONObject event) { + if (event.has(VES_UNIQUE_ID)) { + String uuid = event.get(VES_UNIQUE_ID).toString(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + log.debug("Removing VESuniqueid object from event"); + event.remove(VES_UNIQUE_ID); + } + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java new file mode 100644 index 00000000..a7865a45 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java @@ -0,0 +1,62 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.Try; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; +import io.vavr.control.Try; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +final class DMaaPPublishersBuilder { + + @SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do") + static Try buildPublisher(PublisherConfig config) { + return Try(() -> builder(config).build()) + .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config))); + } + + private static PublisherBuilder builder(PublisherConfig config) { + if (config.isSecured()) { + return authenticatedBuilder(config); + } else { + return unAuthenticatedBuilder(config); + } + } + + private static PublisherBuilder authenticatedBuilder(PublisherConfig config) { + return unAuthenticatedBuilder(config) + .usingHttps() + .authenticatedByHttp(config.userName().get(), config.password().get()); + } + + private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) { + return new CambriaClientBuilders.PublisherBuilder() + .usingHosts(config.destinations().mkString(",")) + .onTopic(config.topic()) + .logSendFailuresAfter(5); + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java new file mode 100644 index 00000000..102d2774 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java @@ -0,0 +1,124 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.Option; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +class DMaaPPublishersCache { + + private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class); + private final LoadingCache publishersCache; + private AtomicReference> dMaaPConfiguration; + + DMaaPPublishersCache(Map dMaaPConfiguration) { + this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); + this.publishersCache = CacheBuilder.newBuilder() + .removalListener(new OnPublisherRemovalListener()) + .build(new CambriaPublishersCacheLoader()); + } + + DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader, + OnPublisherRemovalListener onPublisherRemovalListener, + Map dMaaPConfiguration) { + this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); + this.publishersCache = CacheBuilder.newBuilder() + .removalListener(onPublisherRemovalListener) + .build(dMaaPPublishersCacheLoader); + } + + Option getPublisher(String streamID) { + try { + return Option(publishersCache.getUnchecked(streamID)); + } catch (Exception e) { + log.warn("Could not create / load Cambria Publisher for streamID", e); + return Option.none(); + } + } + + void closePublisherFor(String streamId) { + publishersCache.invalidate(streamId); + } + + synchronized void reconfigure(Map newConfig) { + Map currentConfig = dMaaPConfiguration.get(); + Map removedConfigurations = currentConfig + .filterKeys(domain -> !newConfig.containsKey(domain)); + Map changedConfigurations = newConfig + .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); + dMaaPConfiguration.set(newConfig); + removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1)); + } + + static class OnPublisherRemovalListener implements RemovalListener { + + @Override + public void onRemoval(@Nonnull RemovalNotification notification) { + CambriaBatchingPublisher publisher = notification.getValue(); + if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull + try { + int timeout = 20; + TimeUnit unit = TimeUnit.SECONDS; + java.util.List stuck = publisher.close(timeout, unit); + if (!stuck.isEmpty()) { + log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', " + + "%s messages were dropped", stuck.size(), timeout, unit)); + } + } catch (InterruptedException | IOException e) { + log.error("Could not close Cambria publisher, some messages might have been dropped", e); + } + } + } + } + + class CambriaPublishersCacheLoader extends CacheLoader { + + @Override + public CambriaBatchingPublisher load(@Nonnull String domain) { + return dMaaPConfiguration.get() + .get(domain) + .toTry(() -> new RuntimeException( + f("DMaaP configuration contains no configuration for domain: '%s'", domain))) + .flatMap(DMaaPPublishersBuilder::buildPublisher) + .get(); + } + } + +} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java new file mode 100644 index 00000000..9cd718f8 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/EventPublisher.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import io.vavr.collection.Map; +import org.json.JSONObject; +import org.slf4j.Logger; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +public interface EventPublisher { + + static EventPublisher createPublisher(Logger outputLogger, Map dMaaPConfig) { + return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger); + } + + void sendEvent(JSONObject event, String domain); + + void reconfigure(Map dMaaPConfig); +} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java new file mode 100644 index 00000000..4a056778 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java @@ -0,0 +1,98 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import io.vavr.collection.List; +import io.vavr.control.Option; +import java.util.Objects; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +public final class PublisherConfig { + + private final List destinations; + private final String topic; + private String userName; + private String password; + + PublisherConfig(List destinations, String topic) { + this.destinations = destinations; + this.topic = topic; + } + + PublisherConfig(List destinations, String topic, String userName, String password) { + this.destinations = destinations; + this.topic = topic; + this.userName = userName; + this.password = password; + } + + List destinations() { + return destinations; + } + + String topic() { + return topic; + } + + Option userName() { + return Option.of(userName); + } + + Option password() { + return Option.of(password); + } + + boolean isSecured() { + return userName().isDefined() && password().isDefined(); + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PublisherConfig that = (PublisherConfig) o; + return Objects.equals(destinations, that.destinations) && + Objects.equals(topic, that.topic) && + Objects.equals(userName, that.userName) && + Objects.equals(password, that.password); + } + + @Override + public int hashCode() { + return Objects.hash(destinations, topic, userName, password); + } + + @Override + public String toString() { + return "PublisherConfig{" + + "destinations=" + destinations + + ", topic='" + topic + '\'' + + ", userName='" + userName + '\'' + + ", password='" + password + '\'' + + '}'; + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java new file mode 100644 index 00000000..9bf3ef8c --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.$; + +import io.vavr.API; +import io.vavr.API.Match.Case; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +final class VavrUtils { + + private VavrUtils() { + // utils aggregator + } + + /** + * Shortcut for 'string interpolation' + */ + static String f(String msg, Object... args) { + return String.format(msg, args); + } + + /** + * Wrap failure with a more descriptive message of what has failed and chain original cause. Used to provide a + * context for errors instead of raw exception. + */ + static Case enhanceError(String msg) { + return API.Case($(), e -> new RuntimeException(msg, e)); + } + +} diff --git a/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java b/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java deleted file mode 100644 index f4955ac8..00000000 --- a/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction; - -import static org.hamcrest.CoreMatchers.allOf; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.junit.Assert.assertSame; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaPublisher; -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.UncheckedExecutionException; -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.concurrent.TimeUnit; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - - -@RunWith(MockitoJUnitRunner.class) -public class DmaapPublishersTest { - - @Mock - private CambriaPublisherFactory publisherFactory; - @Mock - private CambriaBatchingPublisher cambriaPublisher; - private DmaapPublishers cut; - - @Rule - public final ExpectedException expectedException = ExpectedException.none(); - - @Before - public void setUp() throws MalformedURLException, GeneralSecurityException { - given(publisherFactory.createCambriaPublisher(anyString())).willReturn(cambriaPublisher); - cut = DmaapPublishers.create(publisherFactory); - } - - @Test - public void getByStreamIdShouldUseCachedItem() throws IOException, GeneralSecurityException { - // given - String streamId = "sampleStream"; - - // when - CambriaBatchingPublisher firstPublisher = cut.getByStreamId(streamId); - CambriaBatchingPublisher secondPublisher = cut.getByStreamId(streamId); - - // then - verify(publisherFactory, times(1)).createCambriaPublisher(streamId); - assertSame("should return same instance", firstPublisher, secondPublisher); - } - - @Test - public void getByStreamIdShouldHandleErrors() throws MalformedURLException, GeneralSecurityException { - // given - MalformedURLException exception = new MalformedURLException(); - given(publisherFactory.createCambriaPublisher(anyString())).willThrow(exception); - expectedException.expect(allOf( - instanceOf(UncheckedExecutionException.class), - causeIsInstanceOf(exception.getClass()))); - - // when - cut.getByStreamId("a stream"); - - // then - // exception should have been thrown - } - - @Test - public void closeByStreamIdShouldCloseConnection() throws IOException, InterruptedException { - // given - String streamId = "sampleStream"; - given(cambriaPublisher.close(anyLong(), any(TimeUnit.class))) - .willReturn(ImmutableList.of(new CambriaPublisher.message("p", "msg"))); - - // when - CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId); - cut.closeByStreamId(streamId); - - // then - assertSame("should return proper publisher", cambriaPublisher, cachedPublisher); - verify(cambriaPublisher).close(20, TimeUnit.SECONDS); - } - - @Test - public void closeByStreamIdShouldHandleErrors() throws IOException, InterruptedException { - // given - String streamId = "sampleStream"; - given(cambriaPublisher.close(anyLong(), any(TimeUnit.class))).willThrow(IOException.class); - - // when - CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId); - cut.closeByStreamId(streamId); - - // then - assertSame("should return proper publisher", cambriaPublisher, cachedPublisher); - verify(cambriaPublisher).close(20, TimeUnit.SECONDS); - } - - private Matcher causeIsInstanceOf(final Class clazz) { - return new BaseMatcher() { - @Override - public boolean matches(Object o) { - return o instanceof Throwable && clazz.isInstance(((Throwable) o).getCause()); - } - - @Override - public void describeTo(Description description) { - description.appendText("exception cause should be an instance of " + clazz.getName()); - } - }; - } -} \ No newline at end of file diff --git a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java index 973ee014..e211c12a 100644 --- a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java +++ b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java @@ -20,17 +20,17 @@ */ package org.onap.dcae.commonFunction; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.google.gson.Gson; +import java.util.concurrent.atomic.AtomicReference; import org.json.JSONObject; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import java.util.List; +import org.onap.dcae.commonFunction.event.publishing.EventPublisher; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -52,7 +52,7 @@ public class EventProcessorTest { @Test public void testLoad() { //given - EventProcessor ec = new EventProcessor(); + EventProcessor ec = new EventProcessor(mock(EventPublisher.class)); ec.event = new org.json.JSONObject(ev); //when ec.overrideEvent(); @@ -65,7 +65,7 @@ public class EventProcessorTest { @Test public void shouldParseJsonEvents() throws ReflectiveOperationException { //given - EventProcessor eventProcessor = new EventProcessor(); + EventProcessor eventProcessor = new EventProcessor(mock(EventPublisher.class)); String event_json = "[{ \"filter\": {\"event.commonEventHeader.domain\":\"heartbeat\",\"VESversion\":\"v4\"},\"processors\":[" + "{\"functionName\": \"concatenateValue\",\"args\":{\"field\":\"event.commonEventHeader.eventName\",\"concatenate\": [\"$event.commonEventHeader.domain\",\"$event.commonEventHeader.eventType\",\"$event.faultFields.alarmCondition\"], \"delimiter\":\"_\"}}" + ",{\"functionName\": \"addAttribute\",\"args\":{\"field\": \"event.heartbeatFields.heartbeatFieldsVersion\",\"value\": \"1.0\",\"fieldType\": \"number\"}}" + @@ -84,32 +84,5 @@ public class EventProcessorTest { assertThat(stringArgumentCaptor.getAllValues()).contains("concatenateValue", "addAttribute", "map"); } - @Test - public void shouldCreateDmaapPublisher() { - - //given - EventPublisherHash eph = EventPublisherHash.getInstance(); - EventProcessor ec = new EventProcessor(); - ec.event = new org.json.JSONObject(ev); - CommonStartup.cambriaConfigFile = "src/test/resources/testDmaapConfig_ip.json"; - - //when - CambriaBatchingPublisher pub = eph.getDmaapPublisher("sec_fault_ueb"); - - //then - assertNotNull(pub); - } - - @Test - public void shouldSendEventWithNoError() { - - EventPublisherHash eph = EventPublisherHash.getInstance(); - EventProcessor eventProcessor = new EventProcessor(); - eventProcessor.event = new org.json.JSONObject(ev); - CommonStartup.cambriaConfigFile = "src/test/resources/testDmaapConfig_ip.json"; - - //when - eph.sendEvent(eventProcessor.event, "sec_fault_ueb"); - } } diff --git a/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java b/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java deleted file mode 100644 index 81c6556b..00000000 --- a/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction; - -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.verify; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import java.io.IOException; -import org.json.JSONObject; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class EventPublisherHashTest { - private EventPublisherHash cut; - - @Mock - private DmaapPublishers dmaapPublishers; - @Mock - private CambriaBatchingPublisher cambriaPublisher; - - @Before - public void setUp() { - given(dmaapPublishers.getByStreamId(anyString())).willReturn(cambriaPublisher); - - cut = new EventPublisherHash(dmaapPublishers); - } - - @Test - public void sendEventShouldSendEventToATopic() throws Exception { - // given - JSONObject event = new JSONObject("{}"); - final String streamId = "sampleStreamId"; - - // when - cut.sendEvent(event, streamId); - - // then - verify(cambriaPublisher).send("MyPartitionKey", event.toString()); - } - - @Test - public void sendEventShouldRemoveUuid() throws Exception { - // given - JSONObject event = new JSONObject("{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}"); - final String streamId = "sampleStreamId"; - - // when - cut.sendEvent(event, streamId); - - // then - verify(cambriaPublisher).send("MyPartitionKey", new JSONObject("{\"another\": 8}").toString()); - } - - @Test - public void sendEventShouldCloseConnectionWhenExceptionOccurred() throws Exception { - // given - JSONObject event = new JSONObject("{}"); - final String streamId = "sampleStreamId"; - given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail")); - - // when - cut.sendEvent(event, streamId); - - // then - verify(dmaapPublishers).closeByStreamId(streamId); - } -} \ No newline at end of file diff --git a/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java b/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java index e0fd5a42..12428024 100644 --- a/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java +++ b/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java @@ -22,6 +22,7 @@ package org.onap.dcae.commonFunction; import static java.util.Base64.getDecoder; import static java.util.Base64.getEncoder; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.att.nsa.cmdLine.NsaCommandLineUtil; @@ -43,6 +44,7 @@ import org.json.JSONObject; import org.junit.Test; import org.mockito.Mockito; import org.onap.dcae.commonFunction.CommonStartup.QueueFullException; +import org.onap.dcae.commonFunction.event.publishing.EventPublisher; import org.onap.dcae.restapi.RestfulCollectorServlet; @@ -79,7 +81,7 @@ public class TestCommonStartup { public void testParseStreamIdToStreamHashMapping() { // given CommonStartup.streamID = "fault=sec_fault|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert|voiceQuality=ves_voicequality|sipSignaling=ves_sipsignaling"; - EventProcessor eventProcessor = new EventProcessor(); + EventProcessor eventProcessor = new EventProcessor(mock(EventPublisher.class)); // when Map streamHashMapping = EventProcessor.streamidHash; diff --git a/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java new file mode 100644 index 00000000..5a94c662 --- /dev/null +++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java @@ -0,0 +1,114 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.List; +import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcae.commonFunction.event.publishing.DMaaPConfigurationParser.parseToDomainMapping; + +import io.vavr.collection.Map; +import io.vavr.control.Try; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.junit.Test; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +public class DMaaPConfigurationParserTest { + + @Test + public void testParseCredentialsForGen2() { + Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsGen2.json"); + Try> publisherConfigs = parseToDomainMapping(path); + + PublisherConfig authCredentialsNulls = publisherConfigs.get().get("auth-credentials-null").getOrNull(); + assertThat(authCredentialsNulls.userName().isEmpty()).isTrue(); + assertThat(authCredentialsNulls.password().isEmpty()).isTrue(); + assertThat(authCredentialsNulls.isSecured()).isFalse(); + + PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull(); + assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser"); + assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword"); + assertThat(authCredentialsPresent.isSecured()).isTrue(); + + PublisherConfig authCredentialsKeysMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull(); + assertThat(authCredentialsKeysMissing.userName().isEmpty()).isTrue(); + assertThat(authCredentialsKeysMissing.password().isEmpty()).isTrue(); + assertThat(authCredentialsKeysMissing.isSecured()).isFalse(); + } + + + @Test + public void testParseCredentialsForLegacy() { + Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsLegacy.json"); + Try> publisherConfigs = parseToDomainMapping(path); + + PublisherConfig authCredentialsNull = publisherConfigs.get().get("auth-credentials-null").getOrNull(); + assertThat(authCredentialsNull.userName().isEmpty()).isTrue(); + assertThat(authCredentialsNull.password().isEmpty()).isTrue(); + assertThat(authCredentialsNull.isSecured()).isFalse(); + + PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull(); + assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser"); + assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword"); + assertThat(authCredentialsPresent.isSecured()).isTrue(); + + PublisherConfig authCredentialsMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull(); + assertThat(authCredentialsMissing.userName().isEmpty()).isTrue(); + assertThat(authCredentialsMissing.password().isEmpty()).isTrue(); + assertThat(authCredentialsMissing.isSecured()).isFalse(); + } + + + @Test + public void testParseGen2() { + Path path = Paths.get("src/test/resources/testParseDMaaPGen2.json"); + Try> publisherConfigs = parseToDomainMapping(path); + + PublisherConfig withEventsSegment = publisherConfigs.get().get("event-segments-with-port").getOrNull(); + assertThat(withEventsSegment.destinations()).isEqualTo(List("UEBHOST:3904")); + assertThat(withEventsSegment.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + + PublisherConfig withOtherSegment = publisherConfigs.get().get("other-segments-without-ports").getOrNull(); + assertThat(withOtherSegment.destinations()).isEqualTo(List("UEBHOST")); + assertThat(withOtherSegment.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + } + + @Test + public void testParseLegacy() { + Path exemplaryConfig = Paths.get("src/test/resources/testParseDMaaPLegacy.json"); + Try> publisherConfigs = DMaaPConfigurationParser + .parseToDomainMapping(exemplaryConfig); + + PublisherConfig urlFirstThenHosts = publisherConfigs.get().get("url-precedes-hosts").getOrNull(); + assertThat(urlFirstThenHosts.destinations()).isEqualTo(List("127.0.0.1:3904")); + assertThat(urlFirstThenHosts.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + + PublisherConfig urlKeyMissing = publisherConfigs.get().get("url-key-missing").getOrNull(); + assertThat(urlKeyMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com")); + assertThat(urlKeyMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + + PublisherConfig urlIsMissing = publisherConfigs.get().get("url-is-null").getOrNull(); + assertThat(urlIsMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com")); + assertThat(urlIsMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + } +} \ No newline at end of file diff --git a/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java new file mode 100644 index 00000000..bbe5079e --- /dev/null +++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java @@ -0,0 +1,89 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.Option; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import java.io.IOException; +import org.json.JSONObject; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; + +public class DMaaPEventPublisherTest { + + private static final String STREAM_ID = "sampleStreamId"; + + private DMaaPEventPublisher eventPublisher; + private CambriaBatchingPublisher cambriaPublisher; + private DMaaPPublishersCache DMaaPPublishersCache; + + @Before + public void setUp() { + cambriaPublisher = mock(CambriaBatchingPublisher.class); + DMaaPPublishersCache = mock(DMaaPPublishersCache.class); + when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher)); + eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache, mock(Logger.class)); + } + + @Test + public void shouldSendEventToTopic() throws Exception { + // given + JSONObject event = new JSONObject("{}"); + + // when + eventPublisher.sendEvent(event, STREAM_ID); + + // then + verify(cambriaPublisher).send("MyPartitionKey", event.toString()); + } + + @Test + public void shouldRemoveInternalVESUIDBeforeSending() throws Exception { + // given + JSONObject event = new JSONObject( + "{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}"); + + // when + eventPublisher.sendEvent(event, STREAM_ID); + + // then + verify(cambriaPublisher).send("MyPartitionKey", new JSONObject("{\"another\": 8}").toString()); + } + + @Test + public void shouldCloseConnectionWhenExceptionOccurred() throws Exception { + // given + JSONObject event = new JSONObject("{}"); + given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail")); + + // when + eventPublisher.sendEvent(event, STREAM_ID); + + // then + verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID); + } +} \ No newline at end of file diff --git a/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java new file mode 100644 index 00000000..8dc69f62 --- /dev/null +++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java @@ -0,0 +1,126 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.List; +import static io.vavr.API.Map; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.onap.dcae.commonFunction.event.publishing.DMaaPPublishersCache.CambriaPublishersCacheLoader; +import org.onap.dcae.commonFunction.event.publishing.DMaaPPublishersCache.OnPublisherRemovalListener; + + +public class DMaaPPublishersCacheTest { + + private String streamId1; + private Map dMaaPConfigs; + + @Before + public void setUp() { + streamId1 = "sampleStream1"; + dMaaPConfigs = Map("sampleStream1", new PublisherConfig(List("destination1"), "topic1")); + } + + @Test + public void shouldReturnTheSameCachedInstanceOnConsecutiveRetrievals() { + // given + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs); + + // when + Option firstPublisher = dMaaPPublishersCache.getPublisher(streamId1); + Option secondPublisher = dMaaPPublishersCache.getPublisher(streamId1); + + // then + assertSame("should return same instance", firstPublisher.get(), secondPublisher.get()); + } + + @Test + public void shouldCloseCambriaPublisherOnCacheInvalidate() throws IOException, InterruptedException { + // given + CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class); + CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class); + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock, + new OnPublisherRemovalListener(), + dMaaPConfigs); + when(cacheLoaderMock.load(streamId1)).thenReturn(cambriaPublisherMock1); + + // when + dMaaPPublishersCache.getPublisher(streamId1); + dMaaPPublishersCache.closePublisherFor(streamId1); + + // then + verify(cambriaPublisherMock1).close(20, TimeUnit.SECONDS); + + } + + @Test + public void shouldReturnNoneIfThereIsNoDMaaPConfigurationForGivenStreamID() { + // given + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs); + + // then + assertTrue("should not exist", dMaaPPublishersCache.getPublisher("non-existing").isEmpty()); + } + + + @Test + public void shouldCloseOnlyChangedPublishers() throws IOException, InterruptedException { + // given + CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class); + CambriaBatchingPublisher cambriaPublisherMock2 = mock(CambriaBatchingPublisher.class); + CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class); + String firstDomain = "domain1"; + String secondDomain = "domain2"; + Map oldConfig = Map(firstDomain, + new PublisherConfig(List("destination1"), "topic1"), + secondDomain, + new PublisherConfig(List("destination2"), "topic2", + "user", "pass")); + Map newConfig = Map(firstDomain, new PublisherConfig(List("destination1"), "topic1"), + secondDomain, new PublisherConfig(List("destination2"), "topic2")); + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock, + new OnPublisherRemovalListener(), + oldConfig); + when(cacheLoaderMock.load(firstDomain)).thenReturn(cambriaPublisherMock1); + when(cacheLoaderMock.load(secondDomain)).thenReturn(cambriaPublisherMock2); + + dMaaPPublishersCache.getPublisher(firstDomain); + dMaaPPublishersCache.getPublisher(secondDomain); + + // when + dMaaPPublishersCache.reconfigure(newConfig); + + // then + verify(cambriaPublisherMock2).close(20, TimeUnit.SECONDS); + verifyZeroInteractions(cambriaPublisherMock1); + } +} \ No newline at end of file diff --git a/src/test/java/org/onap/dcae/vestest/AnyNodeTest.java b/src/test/java/org/onap/dcae/vestest/AnyNodeTest.java index 695f53c9..9400e46d 100644 --- a/src/test/java/org/onap/dcae/vestest/AnyNodeTest.java +++ b/src/test/java/org/onap/dcae/vestest/AnyNodeTest.java @@ -19,89 +19,45 @@ */ package org.onap.dcae.vestest; -import com.google.common.collect.ImmutableMap; +import static org.assertj.core.api.Assertions.assertThat; + import com.google.common.collect.Sets; -import org.json.JSONObject; +import java.util.Set; import org.junit.BeforeClass; import org.junit.Test; import org.onap.dcae.commonFunction.AnyNode; -import java.io.IOException; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.assertj.core.api.Assertions.assertThat; - /** * Created by koblosz on 07.06.18. */ public class AnyNodeTest { - private static final String SAMPLE_JSON_FILEPATH = "src/test/resources/test_anynode_class.json"; - private static final Map EXPECTED_RAW_MAP = ImmutableMap.builder().put("a", 1).put("b", 2).build(); - private static final Set EXPECTED_JSON_KEYS = Sets.newHashSet("channels", "sampleStrList", "sampleNestedObject", "sampleInt", "sampleString", "sampleNull"); + private static final String SAMPLE_JSON_FILEPATH = "{\n" + + " \"channels\": [{\n" + + " \"one\": \"number1\", \"two\": \"number2\", \"three\": \"number3\"}],\n" + + " \"sampleStrList\": [\"1\", \"2\", \"3\", \"4\", \"5\"],\n" + + " \"sampleNestedObject\": {\"a\": 1, \"b\": 2},\n" + + " \"sampleInt\": 1,\n" + + " \"sampleString\": \"str\",\n" + + " \"sampleNull\": null\n" + + "}\n"; + private static final Set EXPECTED_JSON_KEYS = Sets + .newHashSet("channels", "sampleStrList", "sampleNestedObject", "sampleInt", "sampleString", "sampleNull"); private static AnyNode node; @BeforeClass - public static void setUpClass() throws IOException { - node = AnyNode.parse(SAMPLE_JSON_FILEPATH); - } - - @Test(expected = IOException.class) - public void testShouldRethrowExceptionWhenFileNotFound() throws IOException { - AnyNode.parse("not/existing/path"); + public static void setUpClass() { + node = AnyNode.fromString(SAMPLE_JSON_FILEPATH); } @Test public void testShouldReturnJsonObjectKeySet() { - assertThat(node.getKeys()).containsOnlyElementsOf(EXPECTED_JSON_KEYS); - } - - @Test - public void testShouldGetElementAsString() { - assertThat(node.get("sampleStrList").get(0).asString()).isEqualTo("1"); - } - - @Test - public void testShouldGetElementAsInt() { - assertThat(node.get("sampleInt").asInt()).isSameAs(1); - } - - @Test - public void testWhenNullValuePresentShouldReturnJsonObjectNullAsString() { - assertThat(node.get("sampleNull").asString()).isSameAs(JSONObject.NULL.toString()); + assertThat(node.keys()).containsOnlyElementsOf(EXPECTED_JSON_KEYS); } - @Test - public void testShouldGetJsonObjectAsStringToObjectMap() { - assertThat(node.get("sampleNestedObject").asRawMap()).containsAllEntriesOf(EXPECTED_RAW_MAP); - } - - @Test - public void testShouldGetAsMap() { - assertThat(node.asMap().keySet()).containsOnlyElementsOf(EXPECTED_JSON_KEYS); - } - - @Test - public void testShouldGetAsList() { - assertThat(node.get("sampleStrList").asList().stream().map(AnyNode::asString).collect(Collectors.toList())).containsExactly("1", "2", "3", "4", "5"); - } - - @Test - public void testShouldGetAsOptional() { - assertThat(node.getAsOptional("absentKey")).isNotPresent(); - } - - @Test - public void testWhenChainMethodsShouldReturnValue() { - assertThat(node.get("channels").get(0).get("two").asString()).isEqualTo("number2"); - } - - @Test(expected = ClassCastException.class) public void whenInvokedOnJsonObjInsteadOfJsonArrShouldRaiseRuntimeEx() { - node.asList(); + node.toList(); } } \ No newline at end of file diff --git a/src/test/java/org/onap/dcae/vestest/DmaapPropertyReaderTest.java b/src/test/java/org/onap/dcae/vestest/DmaapPropertyReaderTest.java deleted file mode 100644 index 46f5da4b..00000000 --- a/src/test/java/org/onap/dcae/vestest/DmaapPropertyReaderTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.vestest; - -import com.google.common.collect.ImmutableMap; -import org.junit.Test; -import org.onap.dcae.commonFunction.DmaapPropertyReader; - -import java.util.Map; - -import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; - -public class DmaapPropertyReaderTest { - - - private static final String legacyConfigFilePath = "src/test/resources/testDmaapConfig_ip.json"; - private static final String dmaapInputConfigFilePath = "src/test/resources/testDmaapConfig_gen2.json"; - private static final String fullDmaapConfigWithChannels = "src/test/resources/testFullDmaapConfig_channels.json"; - private static final String fullGen2DmaapConfig = "src/test/resources/testFullDmaapConfig_gen2.json"; - - private static final String FAULT_UEB_KEY_PREFIX = "sec_fault_ueb"; - private static final String VES_ALERT_SND_KEY_PREFIX = "ves-thresholdCrossingAlert-secondary"; - private static final String VES_FAULT_SECONDARY = "ves-fault-secondary"; - - private static final String FAULT_BASIC_AUTH_USERNAME_KEY = VES_FAULT_SECONDARY + ".basicAuthUsername"; - private static final String ALERT_BASIC_AUTH_PWD_KEY = VES_ALERT_SND_KEY_PREFIX + ".basicAuthPassword"; - - private static final String VES_ALERT_CAMBRIA_TOPIC_KEY = VES_ALERT_SND_KEY_PREFIX + ".cambria.topic"; - private static final String VES_ALERT_CAMBRIA_URL_KEY = VES_ALERT_SND_KEY_PREFIX + ".cambria.url"; - private static final String VES_FAULT_SND_CAMBRIA_URL_KEY = VES_FAULT_SECONDARY + ".cambria.url"; - private static final String VES_FAULT_SND_AUTH_PWD_KEY = VES_FAULT_SECONDARY + ".basicAuthPassword"; - private static final String VES_FAULT_SND_CAMBRIA_TOPIC_KEY = VES_FAULT_SECONDARY + ".cambria.topic"; - private static final String FAULT_UEB_CAMBRIA_HOSTS_KEY = FAULT_UEB_KEY_PREFIX + ".cambria.hosts"; - private static final String FAULT_UEB_CAMBRIA_TOPIC_KEY = FAULT_UEB_KEY_PREFIX + ".cambria.topic"; - private static final String VES_ALERT_SND_AUTH_USERNAME_KEY = VES_ALERT_SND_KEY_PREFIX + ".basicAuthUsername"; - - private static final String NULL_TOSTRING = "null"; - - private static final Map expectedCompleteGen2DmaapConfig = ImmutableMap.builder() - .put(ALERT_BASIC_AUTH_PWD_KEY, "SamplePassWD2") - .put(VES_ALERT_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV") - .put(FAULT_BASIC_AUTH_USERNAME_KEY, "sampleUsername") - .put(VES_ALERT_CAMBRIA_URL_KEY, "UEBHOST:3904") - .put(VES_FAULT_SND_CAMBRIA_URL_KEY, "UEBHOST:3904") - .put(VES_FAULT_SND_AUTH_PWD_KEY, "SamplePasswd") - .put(VES_FAULT_SND_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV") - .put(VES_ALERT_SND_AUTH_USERNAME_KEY, "sampleUsername2") - .build(); - - private static final Map expectedIncompleteGen2DmaapConfig = ImmutableMap.builder() - .put(VES_ALERT_SND_AUTH_USERNAME_KEY, NULL_TOSTRING) - .put(FAULT_BASIC_AUTH_USERNAME_KEY, NULL_TOSTRING) - .put(VES_ALERT_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV") - .put(VES_ALERT_CAMBRIA_URL_KEY, "UEBHOST:3904") - .put(VES_FAULT_SND_CAMBRIA_URL_KEY, "UEBHOST:3904") - .put(ALERT_BASIC_AUTH_PWD_KEY, NULL_TOSTRING) - .put(VES_FAULT_SND_AUTH_PWD_KEY, NULL_TOSTRING) - .put(VES_FAULT_SND_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV") - .build(); - - private static final Map expectedCompleteChannelsDmaapConfig = ImmutableMap.builder() - .put(FAULT_UEB_CAMBRIA_HOSTS_KEY, "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com") - .put(FAULT_UEB_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV") - .put(FAULT_UEB_KEY_PREFIX + ".basicAuthPassword", "S0mEPassWD") - .put(FAULT_UEB_KEY_PREFIX + ".basicAuthUsername", "sampleUser") - .put(FAULT_UEB_KEY_PREFIX + ".cambria.url", "127.0.0.1:3904") - .build(); - - private static final Map expectedIncompleteChannelsDmaapConfig = ImmutableMap.builder() - .put(FAULT_UEB_CAMBRIA_HOSTS_KEY, "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com") - .put(FAULT_UEB_CAMBRIA_TOPIC_KEY, "DCAE-SE-COLLECTOR-EVENTS-DEV") - .build(); - - @Test - public void testShouldCreateReaderWithAbsentParamsOmittedBasedOnChannelDmaapConfig() { - assertReaderPreservedAllEntriesAfterTransformation(legacyConfigFilePath, expectedIncompleteChannelsDmaapConfig); - } - - @Test - public void testShouldCreateReaderWithAbsentParamsOmittedBasedOnGen2DmaapConfig() { - assertReaderPreservedAllEntriesAfterTransformation(dmaapInputConfigFilePath, expectedIncompleteGen2DmaapConfig); - } - - @Test - public void shouldCreateReaderWithCompleteChannelDmaapConfig() { - assertReaderPreservedAllEntriesAfterTransformation(fullDmaapConfigWithChannels, expectedCompleteChannelsDmaapConfig); - } - - @Test - public void shouldCreateReaderWithCompleteGen2DmaapConfig() { - assertReaderPreservedAllEntriesAfterTransformation(fullGen2DmaapConfig, expectedCompleteGen2DmaapConfig); - } - - private void assertReaderPreservedAllEntriesAfterTransformation(String dmaapConfigFilePath, Map expectedMap) { - DmaapPropertyReader reader = new DmaapPropertyReader(dmaapConfigFilePath); - - assertThat(reader.getDmaapProperties()).containsAllEntriesOf(expectedMap); - assertThat(expectedMap).containsAllEntriesOf(reader.getDmaapProperties()); - } - -} - diff --git a/src/test/resources/testDmaapConfig_gen2.json b/src/test/resources/testDmaapConfig_gen2.json deleted file mode 100644 index d8097bf1..00000000 --- a/src/test/resources/testDmaapConfig_gen2.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "ves-fault-secondary": { - "aaf_username": null, - "dmaap_info": { - "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV", - "location": "mtl5", - "client_id": null, - "client_role": null - }, - "type": "message_router", - "aaf_password": null - }, - "ves-thresholdCrossingAlert-secondary": { - "aaf_username": null, - "dmaap_info": { - "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV", - "location": "mtl5", - "client_id": null, - "client_role": null - }, - "type": "message_router", - "aaf_password": null - } - } \ No newline at end of file diff --git a/src/test/resources/testFullDmaapConfig_channels.json b/src/test/resources/testFullDmaapConfig_channels.json deleted file mode 100644 index e2bc0339..00000000 --- a/src/test/resources/testFullDmaapConfig_channels.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "channels": [ - { - "name": "sec_fault_ueb", - "cambria.url": "127.0.0.1:3904", - "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com", - "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", - "basicAuthPassword": "S0mEPassWD", - "basicAuthUsername": "sampleUser", - "stripHpId": "true" - } - ] -} \ No newline at end of file diff --git a/src/test/resources/testParseDMaaPCredentialsGen2.json b/src/test/resources/testParseDMaaPCredentialsGen2.json new file mode 100644 index 00000000..953cb6e8 --- /dev/null +++ b/src/test/resources/testParseDMaaPCredentialsGen2.json @@ -0,0 +1,21 @@ +{ + "auth-credentials-null": { + "aaf_username": null, + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV", + }, + "aaf_password": null + }, + "auth-credentials-present": { + "aaf_username": "sampleUser", + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV", + }, + "aaf_password": "samplePassword" + }, + "auth-credentials-missing": { + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV", + } + } +} \ No newline at end of file diff --git a/src/test/resources/testParseDMaaPCredentialsLegacy.json b/src/test/resources/testParseDMaaPCredentialsLegacy.json new file mode 100644 index 00000000..ca59c7e7 --- /dev/null +++ b/src/test/resources/testParseDMaaPCredentialsLegacy.json @@ -0,0 +1,26 @@ +{ + "channels": [ + { + "name": "auth-credentials-null", + "cambria.url": "127.0.0.1:3904", + "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com", + "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", + "basicAuthPassword": null, + "basicAuthUsername": null, + }, + { + "name": "auth-credentials-present", + "cambria.url": "127.0.0.1:3904", + "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com", + "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", + "basicAuthPassword": "samplePassword", + "basicAuthUsername": "sampleUser", + }, + { + "name": "auth-credentials-missing", + "cambria.url": "127.0.0.1:3904", + "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com", + "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", + } + ] +} \ No newline at end of file diff --git a/src/test/resources/testParseDMaaPGen2.json b/src/test/resources/testParseDMaaPGen2.json new file mode 100644 index 00000000..5b4fe6a6 --- /dev/null +++ b/src/test/resources/testParseDMaaPGen2.json @@ -0,0 +1,12 @@ +{ + "event-segments-with-port": { + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV", + } + }, + "other-segments-without-ports": { + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/somethingHere/DCAE-SE-COLLECTOR-EVENTS-DEV", + } + } +} \ No newline at end of file diff --git a/src/test/resources/testParseDMaaPLegacy.json b/src/test/resources/testParseDMaaPLegacy.json new file mode 100644 index 00000000..9661e30c --- /dev/null +++ b/src/test/resources/testParseDMaaPLegacy.json @@ -0,0 +1,21 @@ +{ + "channels": [ + { + "name": "url-precedes-hosts", + "cambria.url": "127.0.0.1:3904", + "cambria.hosts": "h1.att.com,h2.att.com", + "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", + }, + { + "name": "url-key-missing", + "cambria.hosts": "h1.att.com,h2.att.com", + "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", + }, + { + "name": "url-is-null", + "cambria.url": null, + "cambria.hosts": "h1.att.com,h2.att.com", + "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV" + } + ] +} \ No newline at end of file -- cgit 1.2.3-korg