summaryrefslogtreecommitdiffstats
path: root/utils/DmaapPublisher/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'utils/DmaapPublisher/src/main/java')
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java79
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java109
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java169
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java39
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java72
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java39
6 files changed, 0 insertions, 507 deletions
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java
deleted file mode 100644
index 87fdcdd87a..0000000000
--- a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * SDC
- * ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.sdc.dmaap;
-
-import com.google.common.base.MoreObjects;
-import org.kohsuke.args4j.Option;
-
-public class CliArgs {
-
- @Option(name="yml",aliases = {"-YML","YML","-yml","-YAML","YAML","-yaml"}, usage="mandatory arg. YAML filename", required=true)
- private String yamlFilename;
-
- @Option(name="path",aliases = {"-path","PATH","-PATH"}, usage="mandatory arg. path to the yaml file which contains topic config (publisher data + messages)", required=true)
- private String yamlPath;
-
- @Option(name="cr",aliases = {"CR","-cr","-CR"}, usage="optional arg. concurrent requests", required=false)
- private String concurrentRequests;
-
- @Option(name="notification",aliases = {"NOTIFICATION","-NOTIFICATION","-notification"}, usage="optional load dynamic messages", required=false)
- private String notificationData;
-
- public String getYamlPath() {
- return yamlPath;
- }
-
- public String getYamlFilename() {
- return yamlFilename;
- }
-
- public void setYamlPath(String yamlPath) {
- this.yamlPath = yamlPath;
- }
-
-
- public String getConcurrentRequests() {
- return concurrentRequests;
- }
-
- public void setConcurrentRequests(String concurrentRequests) {
- this.concurrentRequests = concurrentRequests;
- }
-
- public String getNotificationData() {
- return notificationData;
- }
-
-
- public void setYamlFilename(String yamlFilename) {
- this.yamlFilename = yamlFilename;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("yamlPath", yamlPath)
- .add("concurrentRequests", concurrentRequests)
- .toString();
- }
-
-
-}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java
deleted file mode 100644
index 08411c6b69..0000000000
--- a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * SDC
- * ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.sdc.dmaap;
-
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRPublisher.message;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
-
-import java.util.concurrent.TimeUnit;
-
-public class DmaapPublishTool {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapPublishTool.class);
- final private TopicConfig topicConfig;
-
- public DmaapPublishTool(String yamlPath) throws FileNotFoundException {
- topicConfig = loadTopicConfig(yamlPath);
- System.out.println("yaml file loaded.");
- }
- public DmaapPublishTool(String yamlPath , String notifications) throws FileNotFoundException {
- topicConfig = loadTopicConfig(yamlPath);
- if (StringUtils.isNotBlank(notifications) )
- topicConfig.add( notifications );
- System.out.println("yaml file loaded.");
- }
-
- public void addNotifications(Collection<String> notification){
- topicConfig.addAll( notification );
- }
-
- //safe stream doesn't throw null pointer exception
- public <T> Collection<T> safe(Collection<T> obj){
- return Optional.ofNullable(obj).orElse(Collections.emptySet());
- }
- public <T> List<T> safe(List<T> obj){
- return Optional.ofNullable(obj).orElse(Collections.emptyList());
- }
-
- public void publish(String path) throws IOException, InterruptedException {
- MRBatchingPublisher pub = createPublisher( topicConfig, path );
- System.out.println( "pending message count -> "+pub.getPendingMessageCount() );
- List<String> list = this.topicConfig.getIncomingTopicMessages();
- for(String msg : safe(list) ){
- publishOne( pub , msg );
- }
- closePublisher(pub);
- }
-
- private MRBatchingPublisher createPublisher(TopicConfig topicConfig,String path) throws IOException {
- MRBatchingPublisher publisher = MRClientFactory.createBatchingPublisher(Objects.requireNonNull(Util.toPath(path,topicConfig.getPublisherPropertiesFilePath())));
- System.out.println("publisher created.");
- return publisher;
- }
-
- private TopicConfig loadTopicConfig(String yamlPath) throws FileNotFoundException {
- File yamlFile = new File(Objects.requireNonNull(yamlPath));
- InputStream input = new FileInputStream(yamlFile);
- Yaml yamlHelper = new Yaml();
- return yamlHelper.loadAs(input, TopicConfig.class);
- }
-
- private void publishOne(MRBatchingPublisher pub, String msg) throws IOException, InterruptedException {
- System.out.println("sending: " + msg);
- pub.send(msg);
- System.out.println("message sent.");
- }
-
- private void closePublisher(MRBatchingPublisher pub) throws IOException, InterruptedException {
- System.out.println("closing publisher...");
- // close the publisher to make sure everything's sent before exiting. The batching
- // publisher interface allows the app to get the set of unsent messages. It could
- // write them to disk, for example, to try to send them later.
- final List<message> stuck = pub.close(20, TimeUnit.SECONDS);
- if(!stuck.isEmpty())
- {
- final String errMsg = stuck.size() + " messages unsent";
- logger.error(errMsg);
- System.err.println(errMsg);
- }
- else
- {
- final String successMsg = "Clean exit; all messages sent.";
- logger.info(successMsg);
- System.out.println(successMsg);
- }
- }
-}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java
deleted file mode 100644
index 8bf39e56a1..0000000000
--- a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * SDC
- * ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.sdc.dmaap;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
-import org.kohsuke.args4j.CmdLineException;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.OptionHandlerFilter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.function.Consumer;
-import java.util.stream.IntStream;
-
-import static org.openecomp.sdc.dmaap.Util.*;
-
-public class DmaapPublisher {
- private static final Logger logger = LoggerFactory.getLogger(DmaapPublisher.class);
- private static RequestManager requestManager ;
- private static final ConcurrentLinkedDeque notificationBuffer = new ConcurrentLinkedDeque();
-
-
- private static final List<Long> registeredTasks = new CopyOnWriteArrayList<>();
- private DmaapPublisher() {}
-
- public static void add(String notification){
- notificationBuffer.add( notification );
- }
- public static void addAll(List<String> notifications){
- notificationBuffer.addAll( notifications );
- }
- public static void main(String[] args) {
- doPublish(args);
- }
-
- private static void doPublish( String[] args ) {
- CliArgs cliArgs = new CliArgs();
- CmdLineParser parser = new CmdLineParser(cliArgs);
-
- try {
- // parse the arguments.
- parser.parseArgument( args );
- doPublish( cliArgs );
- }
- catch(CmdLineException e) {
- logger.error("#doPublish - failed to parse arguments.", e);
- printUsage(parser, e);
- return;
- }
- }
-
- public static void doPublish( CliArgs cliArgs ){
- try {
- // parse the arguments.
- DmaapPublishTool tool = new DmaapPublishTool( toPath(cliArgs.getYamlPath() , cliArgs.getYamlFilename()) , cliArgs.getNotificationData() );
- Collection<String> notifications = new ArrayList<String>( notificationBuffer );
- tool.addNotifications( notifications );
- notificationBuffer.removeAll(notifications);
- Integer concurrentRequestCount = 1;
- if ( StringUtils.isNotBlank( cliArgs.getConcurrentRequests() ) )
- concurrentRequestCount = Integer.parseInt( cliArgs.getConcurrentRequests() );
- requestManager = new RequestManager( concurrentRequestCount );
-
- IntStream.range(0,concurrentRequestCount).forEach( it -> {
- //region - report upon finish mechanishem
- long ticket = System.nanoTime();
- registeredTasks.add( ticket );
- Consumer callback = ( uniqueTicket ) -> {
- synchronized ( registeredTasks ){
- registeredTasks.remove( (long)uniqueTicket );
- registeredTasks.notifyAll();
- }};
-
- RunnableReporter task = new RunnableReporter( ticket , tool , cliArgs , callback );
- requestManager.getExecutor().execute( task ) ;
- });
- }
- catch(NumberFormatException e) {
- logger.error("#doPublish - failed to parse argument CR.", e);
- return;
- }
- catch(Exception e) {
- logger.error("#doPublish - failed to publish.", e);
- }
- }
-
- public static class RunnableReporter implements Runnable{
-
- final private long ticket ;
- final private DmaapPublishTool tool;
- final private CliArgs cliArgs;
- final Consumer reporter;
-
- public RunnableReporter(final long ticket , final DmaapPublishTool tool , final CliArgs args , Consumer reporter){
- this.ticket = ticket ;
- this.tool = tool ;
- this.cliArgs = args ;
- this.reporter = reporter;
- }
- @Override
- public void run() {
- try {
- tool.publish( cliArgs.getYamlPath() );
- reporter.accept(ticket);
- }catch(IOException e){
- logger.error("#doPublish - failed to publish.", e);
- }catch(InterruptedException e){
- logger.error("#doPublish - cannot complete publish, thread interuppted.", e);
- Thread.currentThread().interrupt();
- }
- }
- }
-
-
- public static List<Long> getRegisteredTasks() {
- return registeredTasks;
- }
-
- public static void preparePublish( String path, String filename , String concurrentRequests ){
-
- CliArgs cliArgs = new CliArgs();
- if ( StringUtils.isNotBlank( filename ) )
- cliArgs.setYamlFilename( filename );
- if ( StringUtils.isNotBlank( path ) )
- cliArgs.setYamlPath( path );
- if ( NumberUtils.isCreatable( concurrentRequests ) )
- cliArgs.setConcurrentRequests( concurrentRequests );
-
- doPublish( cliArgs );
-
- }
-
-
- private static void printUsage(CmdLineParser parser, CmdLineException e) {
- System.err.println( e.getMessage() );
- System.err.println("java DmaapPublisher [options...] arguments...");
- // print the list of available options
- parser.printUsage(System.err);
- System.err.println();
- // print option sample. This is useful some time
- System.err.println(" Example: java DmaapPublisher " + parser.printExample(OptionHandlerFilter.ALL));
-
- }
-}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java
deleted file mode 100644
index c30a397372..0000000000
--- a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * SDC
- * ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.sdc.dmaap;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-public class RequestManager {
-
- private Executor executor;
-
- public RequestManager(int poolSize ){
- int sz = Math.max( poolSize , 1);
- int recommendedMaxSz = Runtime.getRuntime().availableProcessors() * 2;
- executor = Executors.newFixedThreadPool( Math.min( sz , recommendedMaxSz ) );
- }
-
- public Executor getExecutor() {
- return executor;
- }
-}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java
deleted file mode 100644
index c76a0f57b0..0000000000
--- a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * SDC
- * ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.sdc.dmaap;
-
-import com.google.common.base.MoreObjects;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-public class TopicConfig {
-
- private String publisherPropertiesFilePath;
- private String[] topicMessages; //messages from file
- private final List<String> incomingTopicMessages = Collections.synchronizedList( new ArrayList<String>() ); //incoming messages from network stream|Main
-
- public String getPublisherPropertiesFilePath() {
- return publisherPropertiesFilePath;
- }
- public void setPublisherPropertiesFilePath(String publisherPropertiesFilePath) {
- this.publisherPropertiesFilePath = publisherPropertiesFilePath;
- }
-
- public List<String> getIncomingTopicMessages() {
- return incomingTopicMessages;
- }
- public String[] getTopicMessages() {
- return topicMessages;
- }
- //add incoming message
- public TopicConfig add( String notifications ){
- incomingTopicMessages.add( notifications);
- return this;
- }
-
- public TopicConfig addAll( Collection<String> notifications ){
- incomingTopicMessages.addAll( notifications );
- return this;
- }
-
- public void setTopicMessages(String[] topicMessages) {
- this.topicMessages = topicMessages;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("publisherPropertiesFilePath", publisherPropertiesFilePath)
- .add("topicMessages", topicMessages)
- .toString();
- }
-
-}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java
deleted file mode 100644
index cbc6198172..0000000000
--- a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * SDC
- * ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.sdc.dmaap;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.nio.file.InvalidPathException;
-
-public class Util {
-
- public static String toPath(String path , String filename) throws InvalidPathException{
- if (StringUtils.isNotBlank(path) ){
- if (path.trim().endsWith("/") || path.trim().endsWith("/")){
- return path+(filename!=null ? filename : "");
- }
- return path+"/"+(filename!=null ? filename : "");
-
- }
- throw new InvalidPathException("wrong path configuration cannot find path -> ",path);
- }
-}