diff options
Diffstat (limited to 'utils/DmaapPublisher/src/main/java')
6 files changed, 395 insertions, 0 deletions
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java new file mode 100644 index 0000000000..38e53c824f --- /dev/null +++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java @@ -0,0 +1,59 @@ +package org.openecomp.sdc.dmaap; + +import org.kohsuke.args4j.Option; +import com.google.common.base.MoreObjects; + +public class CliArgs { + + @Option(name="yml",aliases = {"-YML","YML","-yml","-YAML","YAML","-yaml"}, usage="mandatory arg. YAML filename", required=true) + private String yamlFilename; + + @Option(name="path",aliases = {"-path","PATH","-PATH"}, usage="mandatory arg. path to the yaml file which contains topic config (publisher data + messages)", required=true) + private String yamlPath; + + @Option(name="cr",aliases = {"CR","-cr","-CR"}, usage="optional arg. concurrent requests", required=false) + private String concurrentRequests; + + @Option(name="notification",aliases = {"NOTIFICATION","-NOTIFICATION","-notification"}, usage="optional load dynamic messages", required=false) + private String notificationData; + + public String getYamlPath() { + return yamlPath; + } + + public String getYamlFilename() { + return yamlFilename; + } + + public void setYamlPath(String yamlPath) { + this.yamlPath = yamlPath; + } + + + public String getConcurrentRequests() { + return concurrentRequests; + } + + public void setConcurrentRequests(String concurrentRequests) { + this.concurrentRequests = concurrentRequests; + } + + public String getNotificationData() { + return notificationData; + } + + + public void setYamlFilename(String yamlFilename) { + this.yamlFilename = yamlFilename; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("yamlPath", yamlPath) + .add("concurrentRequests", concurrentRequests) + .toString(); + } + + +} diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java new file mode 100644 index 0000000000..61e48fa50e --- /dev/null +++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java @@ -0,0 +1,97 @@ +package org.openecomp.sdc.dmaap; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRPublisher.message; + +public class DmaapPublishTool { + + private static final Logger logger = LoggerFactory.getLogger(DmaapPublishTool.class); + final private TopicConfig topicConfig; + + public DmaapPublishTool(String yamlPath) throws FileNotFoundException { + topicConfig = loadTopicConfig(yamlPath); + System.out.println("yaml file loaded."); + } + public DmaapPublishTool(String yamlPath , String notifications) throws FileNotFoundException { + topicConfig = loadTopicConfig(yamlPath); + if (StringUtils.isNotBlank(notifications) ) + topicConfig.add( notifications ); + System.out.println("yaml file loaded."); + } + + public void addNotifications(Collection<String> notification){ + topicConfig.addAll( notification ); + } + + //safe stream doesn't throw null pointer exception + public <T> Collection<T> safe(Collection<T> obj){ + return Optional.ofNullable(obj).orElse(Collections.emptySet()); + } + public <T> List<T> safe(List<T> obj){ + return Optional.ofNullable(obj).orElse(Collections.emptyList()); + } + + public void publish(String path) throws IOException, InterruptedException { + MRBatchingPublisher pub = createPublisher( topicConfig, path ); + System.out.println( "pending message count -> "+pub.getPendingMessageCount() ); + List<String> list = this.topicConfig.getIncomingTopicMessages(); + for(String msg : safe(list) ){ + publishOne( pub , msg ); + } + closePublisher(pub); + } + + private MRBatchingPublisher createPublisher(TopicConfig topicConfig,String path) throws IOException { + MRBatchingPublisher publisher = MRClientFactory.createBatchingPublisher(Objects.requireNonNull(Util.toPath(path,topicConfig.getPublisherPropertiesFilePath()))); + System.out.println("publisher created."); + return publisher; + } + + private TopicConfig loadTopicConfig(String yamlPath) throws FileNotFoundException { + File yamlFile = new File(Objects.requireNonNull(yamlPath)); + InputStream input = new FileInputStream(yamlFile); + Yaml yamlHelper = new Yaml(); + return yamlHelper.loadAs(input, TopicConfig.class); + } + + private void publishOne(MRBatchingPublisher pub, String msg) throws IOException, InterruptedException { + System.out.println("sending: " + msg); + pub.send(msg); + System.out.println("message sent."); + } + + private void closePublisher(MRBatchingPublisher pub) throws IOException, InterruptedException { + System.out.println("closing publisher..."); + // close the publisher to make sure everything's sent before exiting. The batching + // publisher interface allows the app to get the set of unsent messages. It could + // write them to disk, for example, to try to send them later. + final List<message> stuck = pub.close(20, TimeUnit.SECONDS); + if(!stuck.isEmpty()) + { + final String errMsg = stuck.size() + " messages unsent"; + logger.error(errMsg); + System.err.println(errMsg); + } + else + { + final String successMsg = "Clean exit; all messages sent."; + logger.info(successMsg); + System.out.println(successMsg); + } + } +} diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java new file mode 100644 index 0000000000..fd558356ed --- /dev/null +++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java @@ -0,0 +1,149 @@ +package org.openecomp.sdc.dmaap; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.OptionHandlerFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; +import java.util.stream.IntStream; + + +import static org.openecomp.sdc.dmaap.Util.*; + +public class DmaapPublisher { + private static final Logger logger = LoggerFactory.getLogger(DmaapPublisher.class); + private static RequestManager requestManager ; + private static final ConcurrentLinkedDeque notificationBuffer = new ConcurrentLinkedDeque(); + + + private static final List<Long> registeredTasks = new CopyOnWriteArrayList<>(); + private DmaapPublisher() {} + + public static void add(String notification){ + notificationBuffer.add( notification ); + } + public static void addAll(List<String> notifications){ + notificationBuffer.addAll( notifications ); + } + public static void main(String[] args) { + doPublish(args); + } + + private static void doPublish( String[] args ) { + CliArgs cliArgs = new CliArgs(); + CmdLineParser parser = new CmdLineParser(cliArgs); + + try { + // parse the arguments. + parser.parseArgument( args ); + doPublish( cliArgs ); + } + catch(CmdLineException e) { + logger.error("#doPublish - failed to parse arguments.", e); + printUsage(parser, e); + return; + } + } + + public static void doPublish( CliArgs cliArgs ){ + try { + // parse the arguments. + DmaapPublishTool tool = new DmaapPublishTool( toPath(cliArgs.getYamlPath() , cliArgs.getYamlFilename()) , cliArgs.getNotificationData() ); + Collection<String> notifications = new ArrayList<String>( notificationBuffer ); + tool.addNotifications( notifications ); + notificationBuffer.removeAll(notifications); + Integer concurrentRequestCount = 1; + if ( StringUtils.isNotBlank( cliArgs.getConcurrentRequests() ) ) + concurrentRequestCount = Integer.parseInt( cliArgs.getConcurrentRequests() ); + requestManager = new RequestManager( concurrentRequestCount ); + + IntStream.range(0,concurrentRequestCount).forEach( it -> { + //region - report upon finish mechanishem + long ticket = System.nanoTime(); + registeredTasks.add( ticket ); + Consumer callback = ( uniqueTicket ) -> { + synchronized ( registeredTasks ){ + registeredTasks.remove( (long)uniqueTicket ); + registeredTasks.notifyAll(); + }}; + + RunnableReporter task = new RunnableReporter( ticket , tool , cliArgs , callback ); + requestManager.getExecutor().execute( task ) ; + }); + } + catch(NumberFormatException e) { + logger.error("#doPublish - failed to parse argument CR.", e); + return; + } + catch(Exception e) { + logger.error("#doPublish - failed to publish.", e); + } + } + + public static class RunnableReporter implements Runnable{ + + final private long ticket ; + final private DmaapPublishTool tool; + final private CliArgs cliArgs; + final Consumer reporter; + + public RunnableReporter(final long ticket , final DmaapPublishTool tool , final CliArgs args , Consumer reporter){ + this.ticket = ticket ; + this.tool = tool ; + this.cliArgs = args ; + this.reporter = reporter; + } + @Override + public void run() { + try { + tool.publish( cliArgs.getYamlPath() ); + reporter.accept(ticket); + }catch(IOException e){ + logger.error("#doPublish - failed to publish.", e); + }catch(InterruptedException e){ + logger.error("#doPublish - cannot complete publish, thread interuppted.", e); + Thread.currentThread().interrupt(); + } + } + } + + + public static List<Long> getRegisteredTasks() { + return registeredTasks; + } + + public static void preparePublish( String path, String filename , String concurrentRequests ){ + + CliArgs cliArgs = new CliArgs(); + if ( StringUtils.isNotBlank( filename ) ) + cliArgs.setYamlFilename( filename ); + if ( StringUtils.isNotBlank( path ) ) + cliArgs.setYamlPath( path ); + if ( NumberUtils.isCreatable( concurrentRequests ) ) + cliArgs.setConcurrentRequests( concurrentRequests ); + + doPublish( cliArgs ); + + } + + + private static void printUsage(CmdLineParser parser, CmdLineException e) { + System.err.println( e.getMessage() ); + System.err.println("java DmaapPublisher [options...] arguments..."); + // print the list of available options + parser.printUsage(System.err); + System.err.println(); + // print option sample. This is useful some time + System.err.println(" Example: java DmaapPublisher " + parser.printExample(OptionHandlerFilter.ALL)); + + } +} diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java new file mode 100644 index 0000000000..597baac2bd --- /dev/null +++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java @@ -0,0 +1,19 @@ +package org.openecomp.sdc.dmaap; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +public class RequestManager { + + private Executor executor; + + public RequestManager(int poolSize ){ + int sz = Math.max( poolSize , 1); + int recommendedMaxSz = Runtime.getRuntime().availableProcessors() * 2; + executor = Executors.newFixedThreadPool( Math.min( sz , recommendedMaxSz ) ); + } + + public Executor getExecutor() { + return executor; + } +} diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java new file mode 100644 index 0000000000..a5b43ad0d5 --- /dev/null +++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java @@ -0,0 +1,52 @@ +package org.openecomp.sdc.dmaap; + +import com.google.common.base.MoreObjects; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class TopicConfig { + + private String publisherPropertiesFilePath; + private String[] topicMessages; //messages from file + private final List<String> incomingTopicMessages = Collections.synchronizedList( new ArrayList<String>() ); //incoming messages from network stream|Main + + public String getPublisherPropertiesFilePath() { + return publisherPropertiesFilePath; + } + public void setPublisherPropertiesFilePath(String publisherPropertiesFilePath) { + this.publisherPropertiesFilePath = publisherPropertiesFilePath; + } + + public List<String> getIncomingTopicMessages() { + return incomingTopicMessages; + } + public String[] getTopicMessages() { + return topicMessages; + } + //add incoming message + public TopicConfig add( String notifications ){ + incomingTopicMessages.add( notifications); + return this; + } + + public TopicConfig addAll( Collection<String> notifications ){ + incomingTopicMessages.addAll( notifications ); + return this; + } + + public void setTopicMessages(String[] topicMessages) { + this.topicMessages = topicMessages; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("publisherPropertiesFilePath", publisherPropertiesFilePath) + .add("topicMessages", topicMessages) + .toString(); + } + +} diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java new file mode 100644 index 0000000000..491b07abdd --- /dev/null +++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java @@ -0,0 +1,19 @@ +package org.openecomp.sdc.dmaap; + +import org.apache.commons.lang3.StringUtils; + +import java.nio.file.InvalidPathException; + +public class Util { + + public static String toPath(String path , String filename) throws InvalidPathException{ + if (StringUtils.isNotBlank(path) ){ + if (path.trim().endsWith("/") || path.trim().endsWith("/")){ + return path+(filename!=null ? filename : ""); + } + return path+"/"+(filename!=null ? filename : ""); + + } + throw new InvalidPathException("wrong path configuration cannot find path -> ",path); + } +} |