summaryrefslogtreecommitdiffstats
path: root/utils/DmaapPublisher/src/main/java
diff options
context:
space:
mode:
authorMichael Lando <ml636r@att.com>2018-03-04 14:53:33 +0200
committerMichael Lando <ml636r@att.com>2018-03-07 13:19:05 +0000
commita5445100050e49e83f73424198d73cd72d672a4d (patch)
treecacf4df817df31be23e4e790d1dda857bdae061e /utils/DmaapPublisher/src/main/java
parent51157f92c21976cba4914c378aaa3cba49826931 (diff)
Sync Integ to Master
Change-Id: I71e3acc26fa612127756ac04073a522b9cc6cd74 Issue-ID: SDC-977 Signed-off-by: Gitelman, Tal (tg851x) <tg851x@intl.att.com>
Diffstat (limited to 'utils/DmaapPublisher/src/main/java')
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java59
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java97
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java149
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java19
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java52
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java19
6 files changed, 395 insertions, 0 deletions
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java
new file mode 100644
index 0000000000..38e53c824f
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java
@@ -0,0 +1,59 @@
+package org.openecomp.sdc.dmaap;
+
+import org.kohsuke.args4j.Option;
+import com.google.common.base.MoreObjects;
+
+public class CliArgs {
+
+ @Option(name="yml",aliases = {"-YML","YML","-yml","-YAML","YAML","-yaml"}, usage="mandatory arg. YAML filename", required=true)
+ private String yamlFilename;
+
+ @Option(name="path",aliases = {"-path","PATH","-PATH"}, usage="mandatory arg. path to the yaml file which contains topic config (publisher data + messages)", required=true)
+ private String yamlPath;
+
+ @Option(name="cr",aliases = {"CR","-cr","-CR"}, usage="optional arg. concurrent requests", required=false)
+ private String concurrentRequests;
+
+ @Option(name="notification",aliases = {"NOTIFICATION","-NOTIFICATION","-notification"}, usage="optional load dynamic messages", required=false)
+ private String notificationData;
+
+ public String getYamlPath() {
+ return yamlPath;
+ }
+
+ public String getYamlFilename() {
+ return yamlFilename;
+ }
+
+ public void setYamlPath(String yamlPath) {
+ this.yamlPath = yamlPath;
+ }
+
+
+ public String getConcurrentRequests() {
+ return concurrentRequests;
+ }
+
+ public void setConcurrentRequests(String concurrentRequests) {
+ this.concurrentRequests = concurrentRequests;
+ }
+
+ public String getNotificationData() {
+ return notificationData;
+ }
+
+
+ public void setYamlFilename(String yamlFilename) {
+ this.yamlFilename = yamlFilename;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("yamlPath", yamlPath)
+ .add("concurrentRequests", concurrentRequests)
+ .toString();
+ }
+
+
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java
new file mode 100644
index 0000000000..61e48fa50e
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java
@@ -0,0 +1,97 @@
+package org.openecomp.sdc.dmaap;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRPublisher.message;
+
+public class DmaapPublishTool {
+
+ private static final Logger logger = LoggerFactory.getLogger(DmaapPublishTool.class);
+ final private TopicConfig topicConfig;
+
+ public DmaapPublishTool(String yamlPath) throws FileNotFoundException {
+ topicConfig = loadTopicConfig(yamlPath);
+ System.out.println("yaml file loaded.");
+ }
+ public DmaapPublishTool(String yamlPath , String notifications) throws FileNotFoundException {
+ topicConfig = loadTopicConfig(yamlPath);
+ if (StringUtils.isNotBlank(notifications) )
+ topicConfig.add( notifications );
+ System.out.println("yaml file loaded.");
+ }
+
+ public void addNotifications(Collection<String> notification){
+ topicConfig.addAll( notification );
+ }
+
+ //safe stream doesn't throw null pointer exception
+ public <T> Collection<T> safe(Collection<T> obj){
+ return Optional.ofNullable(obj).orElse(Collections.emptySet());
+ }
+ public <T> List<T> safe(List<T> obj){
+ return Optional.ofNullable(obj).orElse(Collections.emptyList());
+ }
+
+ public void publish(String path) throws IOException, InterruptedException {
+ MRBatchingPublisher pub = createPublisher( topicConfig, path );
+ System.out.println( "pending message count -> "+pub.getPendingMessageCount() );
+ List<String> list = this.topicConfig.getIncomingTopicMessages();
+ for(String msg : safe(list) ){
+ publishOne( pub , msg );
+ }
+ closePublisher(pub);
+ }
+
+ private MRBatchingPublisher createPublisher(TopicConfig topicConfig,String path) throws IOException {
+ MRBatchingPublisher publisher = MRClientFactory.createBatchingPublisher(Objects.requireNonNull(Util.toPath(path,topicConfig.getPublisherPropertiesFilePath())));
+ System.out.println("publisher created.");
+ return publisher;
+ }
+
+ private TopicConfig loadTopicConfig(String yamlPath) throws FileNotFoundException {
+ File yamlFile = new File(Objects.requireNonNull(yamlPath));
+ InputStream input = new FileInputStream(yamlFile);
+ Yaml yamlHelper = new Yaml();
+ return yamlHelper.loadAs(input, TopicConfig.class);
+ }
+
+ private void publishOne(MRBatchingPublisher pub, String msg) throws IOException, InterruptedException {
+ System.out.println("sending: " + msg);
+ pub.send(msg);
+ System.out.println("message sent.");
+ }
+
+ private void closePublisher(MRBatchingPublisher pub) throws IOException, InterruptedException {
+ System.out.println("closing publisher...");
+ // close the publisher to make sure everything's sent before exiting. The batching
+ // publisher interface allows the app to get the set of unsent messages. It could
+ // write them to disk, for example, to try to send them later.
+ final List<message> stuck = pub.close(20, TimeUnit.SECONDS);
+ if(!stuck.isEmpty())
+ {
+ final String errMsg = stuck.size() + " messages unsent";
+ logger.error(errMsg);
+ System.err.println(errMsg);
+ }
+ else
+ {
+ final String successMsg = "Clean exit; all messages sent.";
+ logger.info(successMsg);
+ System.out.println(successMsg);
+ }
+ }
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java
new file mode 100644
index 0000000000..fd558356ed
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java
@@ -0,0 +1,149 @@
+package org.openecomp.sdc.dmaap;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.OptionHandlerFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+
+import static org.openecomp.sdc.dmaap.Util.*;
+
+public class DmaapPublisher {
+ private static final Logger logger = LoggerFactory.getLogger(DmaapPublisher.class);
+ private static RequestManager requestManager ;
+ private static final ConcurrentLinkedDeque notificationBuffer = new ConcurrentLinkedDeque();
+
+
+ private static final List<Long> registeredTasks = new CopyOnWriteArrayList<>();
+ private DmaapPublisher() {}
+
+ public static void add(String notification){
+ notificationBuffer.add( notification );
+ }
+ public static void addAll(List<String> notifications){
+ notificationBuffer.addAll( notifications );
+ }
+ public static void main(String[] args) {
+ doPublish(args);
+ }
+
+ private static void doPublish( String[] args ) {
+ CliArgs cliArgs = new CliArgs();
+ CmdLineParser parser = new CmdLineParser(cliArgs);
+
+ try {
+ // parse the arguments.
+ parser.parseArgument( args );
+ doPublish( cliArgs );
+ }
+ catch(CmdLineException e) {
+ logger.error("#doPublish - failed to parse arguments.", e);
+ printUsage(parser, e);
+ return;
+ }
+ }
+
+ public static void doPublish( CliArgs cliArgs ){
+ try {
+ // parse the arguments.
+ DmaapPublishTool tool = new DmaapPublishTool( toPath(cliArgs.getYamlPath() , cliArgs.getYamlFilename()) , cliArgs.getNotificationData() );
+ Collection<String> notifications = new ArrayList<String>( notificationBuffer );
+ tool.addNotifications( notifications );
+ notificationBuffer.removeAll(notifications);
+ Integer concurrentRequestCount = 1;
+ if ( StringUtils.isNotBlank( cliArgs.getConcurrentRequests() ) )
+ concurrentRequestCount = Integer.parseInt( cliArgs.getConcurrentRequests() );
+ requestManager = new RequestManager( concurrentRequestCount );
+
+ IntStream.range(0,concurrentRequestCount).forEach( it -> {
+ //region - report upon finish mechanishem
+ long ticket = System.nanoTime();
+ registeredTasks.add( ticket );
+ Consumer callback = ( uniqueTicket ) -> {
+ synchronized ( registeredTasks ){
+ registeredTasks.remove( (long)uniqueTicket );
+ registeredTasks.notifyAll();
+ }};
+
+ RunnableReporter task = new RunnableReporter( ticket , tool , cliArgs , callback );
+ requestManager.getExecutor().execute( task ) ;
+ });
+ }
+ catch(NumberFormatException e) {
+ logger.error("#doPublish - failed to parse argument CR.", e);
+ return;
+ }
+ catch(Exception e) {
+ logger.error("#doPublish - failed to publish.", e);
+ }
+ }
+
+ public static class RunnableReporter implements Runnable{
+
+ final private long ticket ;
+ final private DmaapPublishTool tool;
+ final private CliArgs cliArgs;
+ final Consumer reporter;
+
+ public RunnableReporter(final long ticket , final DmaapPublishTool tool , final CliArgs args , Consumer reporter){
+ this.ticket = ticket ;
+ this.tool = tool ;
+ this.cliArgs = args ;
+ this.reporter = reporter;
+ }
+ @Override
+ public void run() {
+ try {
+ tool.publish( cliArgs.getYamlPath() );
+ reporter.accept(ticket);
+ }catch(IOException e){
+ logger.error("#doPublish - failed to publish.", e);
+ }catch(InterruptedException e){
+ logger.error("#doPublish - cannot complete publish, thread interuppted.", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+
+ public static List<Long> getRegisteredTasks() {
+ return registeredTasks;
+ }
+
+ public static void preparePublish( String path, String filename , String concurrentRequests ){
+
+ CliArgs cliArgs = new CliArgs();
+ if ( StringUtils.isNotBlank( filename ) )
+ cliArgs.setYamlFilename( filename );
+ if ( StringUtils.isNotBlank( path ) )
+ cliArgs.setYamlPath( path );
+ if ( NumberUtils.isCreatable( concurrentRequests ) )
+ cliArgs.setConcurrentRequests( concurrentRequests );
+
+ doPublish( cliArgs );
+
+ }
+
+
+ private static void printUsage(CmdLineParser parser, CmdLineException e) {
+ System.err.println( e.getMessage() );
+ System.err.println("java DmaapPublisher [options...] arguments...");
+ // print the list of available options
+ parser.printUsage(System.err);
+ System.err.println();
+ // print option sample. This is useful some time
+ System.err.println(" Example: java DmaapPublisher " + parser.printExample(OptionHandlerFilter.ALL));
+
+ }
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java
new file mode 100644
index 0000000000..597baac2bd
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java
@@ -0,0 +1,19 @@
+package org.openecomp.sdc.dmaap;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class RequestManager {
+
+ private Executor executor;
+
+ public RequestManager(int poolSize ){
+ int sz = Math.max( poolSize , 1);
+ int recommendedMaxSz = Runtime.getRuntime().availableProcessors() * 2;
+ executor = Executors.newFixedThreadPool( Math.min( sz , recommendedMaxSz ) );
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java
new file mode 100644
index 0000000000..a5b43ad0d5
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java
@@ -0,0 +1,52 @@
+package org.openecomp.sdc.dmaap;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class TopicConfig {
+
+ private String publisherPropertiesFilePath;
+ private String[] topicMessages; //messages from file
+ private final List<String> incomingTopicMessages = Collections.synchronizedList( new ArrayList<String>() ); //incoming messages from network stream|Main
+
+ public String getPublisherPropertiesFilePath() {
+ return publisherPropertiesFilePath;
+ }
+ public void setPublisherPropertiesFilePath(String publisherPropertiesFilePath) {
+ this.publisherPropertiesFilePath = publisherPropertiesFilePath;
+ }
+
+ public List<String> getIncomingTopicMessages() {
+ return incomingTopicMessages;
+ }
+ public String[] getTopicMessages() {
+ return topicMessages;
+ }
+ //add incoming message
+ public TopicConfig add( String notifications ){
+ incomingTopicMessages.add( notifications);
+ return this;
+ }
+
+ public TopicConfig addAll( Collection<String> notifications ){
+ incomingTopicMessages.addAll( notifications );
+ return this;
+ }
+
+ public void setTopicMessages(String[] topicMessages) {
+ this.topicMessages = topicMessages;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("publisherPropertiesFilePath", publisherPropertiesFilePath)
+ .add("topicMessages", topicMessages)
+ .toString();
+ }
+
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java
new file mode 100644
index 0000000000..491b07abdd
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java
@@ -0,0 +1,19 @@
+package org.openecomp.sdc.dmaap;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.file.InvalidPathException;
+
+public class Util {
+
+ public static String toPath(String path , String filename) throws InvalidPathException{
+ if (StringUtils.isNotBlank(path) ){
+ if (path.trim().endsWith("/") || path.trim().endsWith("/")){
+ return path+(filename!=null ? filename : "");
+ }
+ return path+"/"+(filename!=null ? filename : "");
+
+ }
+ throw new InvalidPathException("wrong path configuration cannot find path -> ",path);
+ }
+}