diff options
Diffstat (limited to 'src/main/java/org/onap/dcae/commonFunction')
8 files changed, 111 insertions, 213 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java index b09a17a0..267c87a9 100644 --- a/src/main/java/org/onap/dcae/commonFunction/AnyNode.java +++ b/src/main/java/org/onap/dcae/commonFunction/AnyNode.java @@ -45,27 +45,21 @@ import java.util.stream.StreamSupport; * @author koblosz */ public class AnyNode { - private static final Logger LOGGER = LoggerFactory.getLogger(AnyNode.class); - private Object obj; + private final Object obj; + private static final Logger log = LoggerFactory.getLogger(AnyNode.class); - /** - * @param filePath - * @return - * @throws IOException - */ public static AnyNode parse(String filePath) throws IOException { try (FileReader fr = new FileReader(filePath)) { - JSONTokener tokener = new JSONTokener(fr); - return new AnyNode(new JSONObject(tokener)); + return new AnyNode(new JSONObject(new JSONTokener(fr))); } catch (FileNotFoundException | JSONException e1) { - LOGGER.error("Could not find or parse file under path %s due to: %s", filePath, e1.toString()); + log.error("Could not find or parse file under path %s due to: %s", filePath, e1.toString()); e1.printStackTrace(); throw e1; } } /** - * Returns keyset of underlying object. It is assumed that underlying object is of type org.json.JSONObject. + * Returns key set of underlying object. It is assumed that underlying object is of type org.json.JSONObject. * * @return Set of string keys present in underlying JSONObject */ @@ -131,12 +125,12 @@ public class AnyNode { AnyNode result = null; try { result = get(key); - } catch (JSONException ex) { + } catch (JSONException ignored) { } return Optional.ofNullable(result); } - public JSONObject asJsonObject() { + private JSONObject asJsonObject() { return (JSONObject) this.obj; } @@ -159,14 +153,14 @@ public class AnyNode { /** * Converts this object to stream of underlying objects wrapped with AnyNode class. It is assumed that this is of type JSONArray. */ - public Stream<AnyNode> asStream() { + private Stream<AnyNode> asStream() { return StreamSupport.stream(((JSONArray) this.obj).spliterator(), false).map(AnyNode::new); } /** * Checks if specified key is present in this. It is assumed that this is of type JSONObject. */ - public boolean hasKey(String key) { + boolean hasKey(String key) { return getAsOptional(key).isPresent(); } diff --git a/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java b/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java index 41230a14..79109c04 100644 --- a/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java +++ b/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java @@ -30,38 +30,34 @@ import org.slf4j.LoggerFactory; class CambriaPublisherFactory { - private static Logger log = LoggerFactory.getLogger(CambriaPublisherFactory.class); + private final static Logger log = LoggerFactory.getLogger(CambriaPublisherFactory.class); - public CambriaBatchingPublisher createCambriaPublisher(String streamId) + CambriaBatchingPublisher createCambriaPublisher(String streamId) throws MalformedURLException, GeneralSecurityException { String authpwd = null; DmaapPropertyReader reader = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile); - Map<String, String> dmaapProperties = reader.getDmaapProperties(); - String ueburl = dmaapProperties.get(streamId + ".cambria.url"); + Map<String, String> dMaaPProperties = reader.getDmaapProperties(); + String ueburl = dMaaPProperties.get(streamId + ".cambria.url"); if (ueburl == null) { - ueburl = dmaapProperties.get(streamId + ".cambria.hosts"); + ueburl = dMaaPProperties.get(streamId + ".cambria.hosts"); } String topic = reader.getKeyValue(streamId + ".cambria.topic"); String authuser = reader.getKeyValue(streamId + ".basicAuthUsername"); if (authuser != null) { - authpwd = dmaapProperties.get(streamId + ".basicAuthPassword"); + authpwd = dMaaPProperties.get(streamId + ".basicAuthPassword"); } if ((authuser != null) && (authpwd != null)) { log.debug(String.format("URL:%sTOPIC:%sAuthUser:%sAuthpwd:%s", ueburl, topic, authuser, authpwd)); return new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic).usingHttps() .authenticatedByHttp(authuser, authpwd).logSendFailuresAfter(5) - // .logTo(log) - // .limitBatch(100, 10) .build(); } else { log.debug(String.format("URL:%sTOPIC:%s", ueburl, topic)); return new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic) - // .logTo(log) .logSendFailuresAfter(5) - // .limitBatch(100, 10) .build(); } } diff --git a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java index 0ca25e2b..b60104ea 100644 --- a/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java +++ b/src/main/java/org/onap/dcae/commonFunction/CommonStartup.java @@ -29,7 +29,6 @@ import com.att.nsa.drumlin.till.nv.impl.nvPropertiesFile; import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;
import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
import com.fasterxml.jackson.core.JsonParseException;
@@ -51,125 +50,95 @@ import org.slf4j.LoggerFactory; import java.io.IOException;
import java.net.URL;
-import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import javax.servlet.ServletException;
public class CommonStartup extends NsaBaseEndpoint implements Runnable {
- public static final String KCONFIG = "c";
-
- public static final String KSETTING_PORT = "collector.service.port";
- public static final int KDEFAULT_PORT = 8080;
-
- public static final String KSETTING_SECUREPORT = "collector.service.secure.port";
- public static final int KDEFAULT_SECUREPORT = -1;
-
- public static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile";
- public static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile";
- public static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location";
- public static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore";
- public static final String KSETTING_KEYALIAS = "collector.keystore.alias";
- public static final String KDEFAULT_KEYALIAS = "tomcat";
-
- public static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile";
- protected static final String[] KDEFAULT_DMAAPCONFIGS = new String[] { "/etc/DmaapConfig.json" };
-
- public static final String KSETTING_MAXQUEUEDEVENTS = "collector.inputQueue.maxPending";
- public static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4;
-
- public static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag";
- public static final int KDEFAULT_SCHEMAVALIDATOR = -1;
-
- public static final String KSETTING_SCHEMAFILE = "collector.schema.file";
- public static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}";
-
- public static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid";
-
- public static final String KSETTING_AUTHFLAG = "header.authflag";
- public static final int KDEFAULT_AUTHFLAG = 0;
-
- public static final String KSETTING_AUTHLIST = "header.authlist";
-
- public static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag";
- public static final int KDEFAULT_EVENTTRANSFORMFLAG = 1;
-
+ private static final String KCONFIG = "c";
+ private static final String KSETTING_PORT = "collector.service.port";
+ private static final int KDEFAULT_PORT = 8080;
+ private static final String KSETTING_SECUREPORT = "collector.service.secure.port";
+ private static final int KDEFAULT_SECUREPORT = -1;
+ private static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile";
+ private static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile";
+ private static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location";
+ private static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore";
+ private static final String KSETTING_KEYALIAS = "collector.keystore.alias";
+ private static final String KDEFAULT_KEYALIAS = "tomcat";
+ private static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile";
+ private static final String[] KDEFAULT_DMAAPCONFIGS = new String[] { "/etc/DmaapConfig.json" };
+ private static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag";
+ private static final int KDEFAULT_SCHEMAVALIDATOR = -1;
+ private static final String KSETTING_SCHEMAFILE = "collector.schema.file";
+ private static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}";
+ private static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid";
+ private static final String KSETTING_AUTHFLAG = "header.authflag";
+ private static final int KDEFAULT_AUTHFLAG = 0;
+ private static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag";
+ private static final int KDEFAULT_EVENTTRANSFORMFLAG = 1;
+ private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");
- public static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
+ static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");
- public static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
+ public static final String KSETTING_AUTHLIST = "header.authlist";
+ static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4;
public static int schemaValidatorflag = -1;
public static int authflag = 1;
- public static int eventTransformFlag = 1;
- public static String schemaFile;
+ static int eventTransformFlag = 1;
public static JSONObject schemaFileJson;
- public static String cambriaConfigFile;
- private boolean listnerstatus;
- public static String streamid;
+ static String cambriaConfigFile;
+ public static String streamID;
- public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
+ static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
private static ApiServer fTomcatServer = null;
private static final Logger log = LoggerFactory.getLogger(CommonStartup.class);
- private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting,
- rrNvReadable.invalidSettingValue, ServletException, InterruptedException {
- final List<ApiServerConnector> connectors = new LinkedList<ApiServerConnector>();
+ private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting {
+ final List<ApiServerConnector> connectors = new LinkedList<>();
if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) {
- // http service
connectors.add(new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)).secure(false)
.build());
}
- // optional https service
final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT);
final String keystoreFile = settings.getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE);
final String keystorePasswordFile = settings.getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE);
final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS);
if (securePort > 0) {
- final String KSETTING_KEYSTOREPASS = readFile(keystorePasswordFile);
+ String keystorePassword = readFile(keystorePasswordFile);
connectors.add(new ApiServerConnector.Builder(securePort).secure(true)
- .keystorePassword(KSETTING_KEYSTOREPASS).keystoreFile(keystoreFile).keyAlias(keyAlias).build());
+ .keystorePassword(keystorePassword).keystoreFile(keystoreFile).keyAlias(keyAlias).build());
}
- // Reading other config properties
-
schemaValidatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR);
if (schemaValidatorflag > 0) {
- schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE);
- // System.out.println("SchemaFile:" + schemaFile);
+ String schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE);
schemaFileJson = new JSONObject(schemaFile);
}
authflag = settings.getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG);
- String[] currentconffile = settings.getStrings(CommonStartup.KSETTING_DMAAPCONFIGS,
- CommonStartup.KDEFAULT_DMAAPCONFIGS);
- cambriaConfigFile = currentconffile[0];
- streamid = settings.getString(KSETTING_DMAAPSTREAMID, null);
+ String[] currentConfigFile = settings.getStrings(KSETTING_DMAAPCONFIGS, KDEFAULT_DMAAPCONFIGS);
+ cambriaConfigFile = currentConfigFile[0];
+ streamID = settings.getString(KSETTING_DMAAPSTREAMID, null);
eventTransformFlag = settings.getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG);
fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true)
.name("collector").build();
-
- setListnerstatus(true);
}
public static void main(String[] args) {
- ExecutorService executor = null;
try {
- // process command line arguments
final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
final String config = NsaCommandLineUtil.getSetting(argMap, KCONFIG, "collector.properties");
final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class);
@@ -178,101 +147,66 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable { settings.push(new nvPropertiesFile(settingStream));
settings.push(new nvReadableTable(argMap));
- fProcessingInputQueue = new LinkedBlockingQueue<JSONObject>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS);
+ fProcessingInputQueue = new LinkedBlockingQueue<>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS);
VESLogger.setUpEcompLogging();
CommonStartup cs = new CommonStartup(settings);
- Thread csmain = new Thread(cs);
- csmain.start();
+ Thread commonStartupThread = new Thread(cs);
+ commonStartupThread.start();
EventProcessor ep = new EventProcessor();
- executor = Executors.newFixedThreadPool(20);
- //executor.execute(ep);
+ ExecutorService executor = Executors.newFixedThreadPool(20);
for (int i = 0; i < 20; ++i) {
executor.execute(ep);
- }
+ }
- } catch (loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException
- | InterruptedException e) {
+ } catch (loadException | missingReqdSetting | IOException e) {
CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage());
throw new RuntimeException(e);
} catch (Exception e) {
System.err.println("Uncaught exception - " + e.getMessage());
CommonStartup.eplog.error("FATAL_ERROR" + e.getMessage());
e.printStackTrace(System.err);
- } finally {
- // This will make the executor accept no new threads
- // and finish all existing threads in the queue
- /*if (executor != null) {
- executor.shutdown();
- }*/
-
}
}
public void run() {
try {
fTomcatServer.start();
+ fTomcatServer.await();
} catch (LifecycleException | IOException e) {
-
- log.error("lifecycle or IO: ", e);
+ throw new RuntimeException(e);
}
- fTomcatServer.await();
- }
-
- public boolean isListnerstatus() {
- return listnerstatus;
- }
-
- public void setListnerstatus(boolean listnerstatus) {
- this.listnerstatus = listnerstatus;
- }
-
- public static Queue<JSONObject> getProcessingInputQueue() {
- return fProcessingInputQueue;
}
public static class QueueFullException extends Exception {
-
private static final long serialVersionUID = 1L;
}
- public static void handleEvents(JSONArray a) throws QueueFullException, JSONException, IOException {
- final Queue<JSONObject> queue = getProcessingInputQueue();
- try {
-
- CommonStartup.metriclog.info("EVENT_PUBLISH_START");
- for (int i = 0; i < a.length(); i++) {
- if (!queue.offer(a.getJSONObject(i))) {
- throw new QueueFullException();
- }
-
+ public static void handleEvents(JSONArray a) throws QueueFullException, JSONException {
+ CommonStartup.metriclog.info("EVENT_PUBLISH_START");
+ for (int i = 0; i < a.length(); i++) {
+ if (!fProcessingInputQueue.offer(a.getJSONObject(i))) {
+ throw new QueueFullException();
}
- log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
- CommonStartup.metriclog.info("EVENT_PUBLISH_END");
- // ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS);
-
- } catch (JSONException e) {
- throw e;
-
}
+ log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
+ CommonStartup.metriclog.info("EVENT_PUBLISH_END");
}
- static String readFile(String path) throws IOException {
+ private static String readFile(String path) throws IOException {
byte[] encoded = Files.readAllBytes(Paths.get(path));
String pwd = new String(encoded);
return pwd.substring(0, pwd.length() - 1);
}
- public static String schemavalidate(String jsonData, String jsonSchema) {
+ public static String validateAgainstSchema(String jsonData, String jsonSchema) {
ProcessingReport report;
String result = "false";
try {
- // System.out.println("Applying schema: @<@<"+jsonSchema+">@>@ to
- // data: #<#<"+jsonData+">#>#");
log.trace("Schema validation for event:" + jsonData);
JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
JsonNode data = JsonLoader.fromString(jsonData);
@@ -280,20 +214,18 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable { JsonSchema schema = factory.getJsonSchema(schemaNode);
report = schema.validate(data);
} catch (JsonParseException e) {
- log.error("schemavalidate:JsonParseException for event:" + jsonData);
+ log.error("validateAgainstSchema:JsonParseException for event:" + jsonData);
return e.getMessage();
} catch (ProcessingException e) {
- log.error("schemavalidate:Processing exception for event:" + jsonData);
+ log.error("validateAgainstSchema:Processing exception for event:" + jsonData);
return e.getMessage();
} catch (IOException e) {
log.error(
- "schemavalidate:IO exception; something went wrong trying to read json data for event:" + jsonData);
+ "validateAgainstSchema:IO exception; something went wrong trying to read json data for event:" + jsonData);
return e.getMessage();
}
if (report != null) {
- Iterator<ProcessingMessage> iter = report.iterator();
- while (iter.hasNext()) {
- ProcessingMessage pm = iter.next();
+ for (ProcessingMessage pm : report) {
log.trace("Processing Message: " + pm.getMessage());
}
result = String.valueOf(report.isSuccess());
@@ -301,7 +233,7 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable { try {
log.debug("Validation Result:" + result + " Validation report:" + report);
} catch (NullPointerException e) {
- log.error("schemavalidate:NullpointerException on report");
+ log.error("validateAgainstSchema:NullpointerException on report");
}
return result;
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java index 80a65f08..a6de0fc8 100644 --- a/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java +++ b/src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; public class ConfigProcessors { - private static Logger log = LoggerFactory.getLogger(ConfigProcessors.class); + private static final Logger log = LoggerFactory.getLogger(ConfigProcessors.class); private static final String FIELD = "field"; private static final String OLD_FIELD = "oldField"; private static final String FILTER = "filter"; @@ -39,7 +39,7 @@ public class ConfigProcessors { private static final String FILTER_NOT_MET = "Filter not met"; private static final String COMP_FALSE = "==false"; - private JSONObject event; + private final JSONObject event; public ConfigProcessors(JSONObject eventJson) { event = eventJson; @@ -170,7 +170,7 @@ public class ConfigProcessors { private String performOperation(String operation, String value) { log.info("performOperation"); - if (operation != null && "convertMBtoKB".equals(operation)) { + if ("convertMBtoKB".equals(operation)) { float kbValue = Float.parseFloat(value) * 1024; value = String.valueOf(kbValue); } @@ -184,7 +184,7 @@ public class ConfigProcessors { final String oldField = jsonObject.getString(OLD_FIELD); final JSONObject filter = jsonObject.optJSONObject(FILTER); final String operation = jsonObject.optString("operation"); - String value = ""; + String value; if (filter == null || isFilterMet(filter)) { value = getEventObjectVal(oldField).toString(); @@ -267,19 +267,19 @@ public class ConfigProcessors { final JSONArray values = jsonObject.getJSONArray("concatenate"); final JSONObject filter = jsonObject.optJSONObject(FILTER); if (filter == null || isFilterMet(filter)) { - String value = ""; + StringBuilder value = new StringBuilder(); for (int i = 0; i < values.length(); i++) { String tempVal = evaluate(values.getString(i)); if (!tempVal.equals(OBJECT_NOT_FOUND)) { if (i == 0) - value = value + tempVal; + value.append(tempVal); else - value = value + delimiter + tempVal; + value.append(delimiter).append(tempVal); } } - setEventObjectVal(field, value); + setEventObjectVal(field, value.toString()); } else log.info(FILTER_NOT_MET); } @@ -323,8 +323,6 @@ public class ConfigProcessors { private boolean checkFilter(JSONObject jo, String key, String logicKey) { String filterValue = jo.getString(key); - boolean retVal = true; - if (filterValue.contains(":")) { String[] splitVal = filterValue.split(":"); if ("matches".equals(splitVal[0])) { @@ -368,26 +366,21 @@ public class ConfigProcessors { } } } - return retVal; + return true; } public boolean isFilterMet(JSONObject jo) { - boolean retval = true; - for (String key : jo.keySet()) { if ("not".equals(key)) { JSONObject njo = jo.getJSONObject(key); for (String njoKey : njo.keySet()) { - - retval = checkFilter(njo, njoKey, key); - if (!retval) - return retval; + if (!checkFilter(njo, njoKey, key)) + return false; } } else { - retval = checkFilter(jo, key, key); - if (!retval) - return retval; + if (!checkFilter(jo, key, key)) + return false; } } return true; @@ -407,17 +400,16 @@ public class ConfigProcessors { keySeriesStr = keySeriesStr.substring(0, keySeriesStr.length() - 1); String[] keySet = keySeriesStr.split("\\.", keySeriesStr.length()); Object keySeriesObj = event; - for (int i = 0; i < (keySet.length); i++) { - + for (String aKeySet : keySet) { if (keySeriesObj != null) { if (keySeriesObj instanceof String) { log.info("STRING==" + keySeriesObj); } else if (keySeriesObj instanceof JSONArray) { - keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(keySet[i])); + keySeriesObj = ((JSONArray) keySeriesObj).optJSONObject(Integer.parseInt(aKeySet)); } else if (keySeriesObj instanceof JSONObject) { - keySeriesObj = ((JSONObject) keySeriesObj).opt(keySet[i]); + keySeriesObj = ((JSONObject) keySeriesObj).opt(aKeySet); } else { log.info("unknown object==" + keySeriesObj); diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java index c0659aa6..0ee1e434 100644 --- a/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java +++ b/src/main/java/org/onap/dcae/commonFunction/DmaapPropertyReader.java @@ -68,9 +68,9 @@ public class DmaapPropertyReader { Map<String, String> transformedDmaapProperties = new HashMap<>(); try { AnyNode root = AnyNode.parse(configFilePath); - if (root.hasKey("channels")) { // Check if dmaap config is handled by legacy controller/service/manager + if (isInLegacyFormat(root)) { transformedDmaapProperties = getLegacyDmaapPropertiesWithChannels(root.get("channels")); - } else {//Handing new format from controllergen2/config_binding_service + } else { transformedDmaapProperties = getDmaapPropertiesWithInfoData(root); } } catch (IOException e) { @@ -79,6 +79,10 @@ public class DmaapPropertyReader { return transformedDmaapProperties; } + private static boolean isInLegacyFormat(AnyNode root) { + return root.hasKey("channels"); + } + private static Map<String, String> getLegacyDmaapPropertiesWithChannels(AnyNode channelsNode) { return channelsNode.asList().stream() .map(DmaapPropertyReader::getTransformedMandatoryChannelProperties) diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java index c86a3742..a4c62719 100644 --- a/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java +++ b/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java @@ -25,7 +25,6 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; import java.io.IOException; import java.net.MalformedURLException; import java.security.GeneralSecurityException; @@ -36,7 +35,7 @@ import org.slf4j.LoggerFactory; class DmaapPublishers { - private static Logger log = LoggerFactory.getLogger(DmaapPublishers.class); + private static final Logger log = LoggerFactory.getLogger(DmaapPublishers.class); private final LoadingCache<String, CambriaBatchingPublisher> publishers; private DmaapPublishers( @@ -50,13 +49,9 @@ class DmaapPublishers { static DmaapPublishers create(final CambriaPublisherFactory publisherFactory) { final LoadingCache<String, CambriaBatchingPublisher> cache = CacheBuilder.<String, CambriaBatchingPublisher>newBuilder() -// .expireAfterAccess(10, TimeUnit.MINUTES) - .removalListener(new RemovalListener<String, CambriaBatchingPublisher>() { - @Override - public void onRemoval(RemovalNotification<String, CambriaBatchingPublisher> notification) { - if (notification.getValue() != null) { - onCacheItemInvalidated(notification.getValue()); - } + .removalListener((RemovalListener<String, CambriaBatchingPublisher>) notification -> { + if (notification.getValue() != null) { + onCacheItemInvalidated(notification.getValue()); } }) .build(new CacheLoader<String, CambriaBatchingPublisher>() { @@ -75,11 +70,11 @@ class DmaapPublishers { return new DmaapPublishers(cache); } - public CambriaBatchingPublisher getByStreamId(String streamId) { + CambriaBatchingPublisher getByStreamId(String streamId) { return publishers.getUnchecked(streamId); } - public void closeByStreamId(String streamId) { + void closeByStreamId(String streamId) { publishers.invalidate(streamId); } diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java index 6dd2f5c8..06a27328 100644 --- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java +++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java @@ -41,7 +41,7 @@ import java.util.HashMap; import java.util.List;
-public class EventProcessor implements Runnable {
+class EventProcessor implements Runnable {
private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
private static final String EVENT_LITERAL = "event";
@@ -53,8 +53,8 @@ public class EventProcessor implements Runnable { static Map<String, String[]> streamidHash = new HashMap<>();
public JSONObject event;
- public EventProcessor() {
- streamidHash = parseStreamIdToStreamHashMapping(CommonStartup.streamid);
+ EventProcessor() {
+ streamidHash = parseStreamIdToStreamHashMapping(CommonStartup.streamID);
}
private Map<String, String[]> parseStreamIdToStreamHashMapping(String streamId) {
@@ -63,7 +63,6 @@ public class EventProcessor implements Runnable { for (String aList : list) {
String domain = aList.split("=")[0];
String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(",");
-
streamidHash.put(domain, streamIdList);
}
return streamidHash;
@@ -72,12 +71,9 @@ public class EventProcessor implements Runnable { @Override
public void run() {
-
try {
-
- event = CommonStartup.fProcessingInputQueue.take();
-
- while (event != null) {
+ while (true) {
+ event = CommonStartup.fProcessingInputQueue.take();
// As long as the producer is running we remove elements from
// the queue.
log.info("QueueSize:" + CommonStartup.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
@@ -97,8 +93,6 @@ public class EventProcessor implements Runnable { sendEventsToStreams(streamIdList);
}
log.debug("Message published" + event);
- event = CommonStartup.fProcessingInputQueue.take();
-
}
} catch (InterruptedException e) {
log.error("EventProcessor InterruptedException" + e.getMessage());
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java b/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java index 474424a7..f3907126 100644 --- a/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java +++ b/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java @@ -38,11 +38,6 @@ public class EventPublisherHash { private static volatile EventPublisherHash instance = new EventPublisherHash(DmaapPublishers.create()); private final DmaapPublishers dmaapPublishers; - /** - * Returns event publisher - * - * @return event publisher - */ public static EventPublisherHash getInstance() { return instance; } @@ -52,14 +47,14 @@ public class EventPublisherHash { this.dmaapPublishers = dmaapPublishers; } - public void sendEvent(JSONObject event, String streamid) { + void sendEvent(JSONObject event, String streamid) { log.debug("EventPublisher.sendEvent: instance for publish is ready"); clearVesUniqueId(event); try { sendEventUsingCachedPublisher(streamid, event); } catch (IOException | IllegalArgumentException e) { - log.error("Unable to publish event: {} streamid: {}. Exception: {}", event, streamid, e); + log.error("Unable to publish event: {} streamID: {}. Exception: {}", event, streamid, e); dmaapPublishers.closeByStreamId(streamid); } } @@ -76,19 +71,15 @@ public class EventPublisherHash { private void sendEventUsingCachedPublisher(String streamid, JSONObject event) throws IOException { int pendingMsgs = dmaapPublishers.getByStreamId(streamid).send("MyPartitionKey", event.toString()); - // this.wait(2000); - if (pendingMsgs > 100) { log.info("Pending Message Count=" + pendingMsgs); } - log.info("pub.send invoked - no error"); - //CommonStartup.oplog.info(String.format("URL:%sTOPIC:%sEvent Published:%s", ueburl, topic, event)); CommonStartup.oplog.info(String.format("StreamID:%s Event Published:%s ", streamid, event)); } @VisibleForTesting - public CambriaBatchingPublisher getDmaapPublisher(String streamId) { + CambriaBatchingPublisher getDmaapPublisher(String streamId) { return dmaapPublishers.getByStreamId(streamId); } } |