diff options
Diffstat (limited to 'utils/DmaapPublisher/src/main/java/org')
6 files changed, 0 insertions, 507 deletions
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java deleted file mode 100644 index 87fdcdd87a..0000000000 --- a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java +++ /dev/null @@ -1,79 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * SDC - * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.sdc.dmaap; - -import com.google.common.base.MoreObjects; -import org.kohsuke.args4j.Option; - -public class CliArgs { - - @Option(name="yml",aliases = {"-YML","YML","-yml","-YAML","YAML","-yaml"}, usage="mandatory arg. YAML filename", required=true) - private String yamlFilename; - - @Option(name="path",aliases = {"-path","PATH","-PATH"}, usage="mandatory arg. path to the yaml file which contains topic config (publisher data + messages)", required=true) - private String yamlPath; - - @Option(name="cr",aliases = {"CR","-cr","-CR"}, usage="optional arg. concurrent requests", required=false) - private String concurrentRequests; - - @Option(name="notification",aliases = {"NOTIFICATION","-NOTIFICATION","-notification"}, usage="optional load dynamic messages", required=false) - private String notificationData; - - public String getYamlPath() { - return yamlPath; - } - - public String getYamlFilename() { - return yamlFilename; - } - - public void setYamlPath(String yamlPath) { - this.yamlPath = yamlPath; - } - - - public String getConcurrentRequests() { - return concurrentRequests; - } - - public void setConcurrentRequests(String concurrentRequests) { - this.concurrentRequests = concurrentRequests; - } - - public String getNotificationData() { - return notificationData; - } - - - public void setYamlFilename(String yamlFilename) { - this.yamlFilename = yamlFilename; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("yamlPath", yamlPath) - .add("concurrentRequests", concurrentRequests) - .toString(); - } - - -} diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java deleted file mode 100644 index 08411c6b69..0000000000 --- a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java +++ /dev/null @@ -1,109 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * SDC - * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.sdc.dmaap; - -import com.att.nsa.mr.client.MRBatchingPublisher; -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.client.MRPublisher.message; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.yaml.snakeyaml.Yaml; - -import java.util.concurrent.TimeUnit; - -public class DmaapPublishTool { - - private static final Logger logger = LoggerFactory.getLogger(DmaapPublishTool.class); - final private TopicConfig topicConfig; - - public DmaapPublishTool(String yamlPath) throws FileNotFoundException { - topicConfig = loadTopicConfig(yamlPath); - System.out.println("yaml file loaded."); - } - public DmaapPublishTool(String yamlPath , String notifications) throws FileNotFoundException { - topicConfig = loadTopicConfig(yamlPath); - if (StringUtils.isNotBlank(notifications) ) - topicConfig.add( notifications ); - System.out.println("yaml file loaded."); - } - - public void addNotifications(Collection<String> notification){ - topicConfig.addAll( notification ); - } - - //safe stream doesn't throw null pointer exception - public <T> Collection<T> safe(Collection<T> obj){ - return Optional.ofNullable(obj).orElse(Collections.emptySet()); - } - public <T> List<T> safe(List<T> obj){ - return Optional.ofNullable(obj).orElse(Collections.emptyList()); - } - - public void publish(String path) throws IOException, InterruptedException { - MRBatchingPublisher pub = createPublisher( topicConfig, path ); - System.out.println( "pending message count -> "+pub.getPendingMessageCount() ); - List<String> list = this.topicConfig.getIncomingTopicMessages(); - for(String msg : safe(list) ){ - publishOne( pub , msg ); - } - closePublisher(pub); - } - - private MRBatchingPublisher createPublisher(TopicConfig topicConfig,String path) throws IOException { - MRBatchingPublisher publisher = MRClientFactory.createBatchingPublisher(Objects.requireNonNull(Util.toPath(path,topicConfig.getPublisherPropertiesFilePath()))); - System.out.println("publisher created."); - return publisher; - } - - private TopicConfig loadTopicConfig(String yamlPath) throws FileNotFoundException { - File yamlFile = new File(Objects.requireNonNull(yamlPath)); - InputStream input = new FileInputStream(yamlFile); - Yaml yamlHelper = new Yaml(); - return yamlHelper.loadAs(input, TopicConfig.class); - } - - private void publishOne(MRBatchingPublisher pub, String msg) throws IOException, InterruptedException { - System.out.println("sending: " + msg); - pub.send(msg); - System.out.println("message sent."); - } - - private void closePublisher(MRBatchingPublisher pub) throws IOException, InterruptedException { - System.out.println("closing publisher..."); - // close the publisher to make sure everything's sent before exiting. The batching - // publisher interface allows the app to get the set of unsent messages. It could - // write them to disk, for example, to try to send them later. - final List<message> stuck = pub.close(20, TimeUnit.SECONDS); - if(!stuck.isEmpty()) - { - final String errMsg = stuck.size() + " messages unsent"; - logger.error(errMsg); - System.err.println(errMsg); - } - else - { - final String successMsg = "Clean exit; all messages sent."; - logger.info(successMsg); - System.out.println(successMsg); - } - } -} diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java deleted file mode 100644 index 8bf39e56a1..0000000000 --- a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java +++ /dev/null @@ -1,169 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * SDC - * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.sdc.dmaap; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.math.NumberUtils; -import org.kohsuke.args4j.CmdLineException; -import org.kohsuke.args4j.CmdLineParser; -import org.kohsuke.args4j.OptionHandlerFilter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.function.Consumer; -import java.util.stream.IntStream; - -import static org.openecomp.sdc.dmaap.Util.*; - -public class DmaapPublisher { - private static final Logger logger = LoggerFactory.getLogger(DmaapPublisher.class); - private static RequestManager requestManager ; - private static final ConcurrentLinkedDeque notificationBuffer = new ConcurrentLinkedDeque(); - - - private static final List<Long> registeredTasks = new CopyOnWriteArrayList<>(); - private DmaapPublisher() {} - - public static void add(String notification){ - notificationBuffer.add( notification ); - } - public static void addAll(List<String> notifications){ - notificationBuffer.addAll( notifications ); - } - public static void main(String[] args) { - doPublish(args); - } - - private static void doPublish( String[] args ) { - CliArgs cliArgs = new CliArgs(); - CmdLineParser parser = new CmdLineParser(cliArgs); - - try { - // parse the arguments. - parser.parseArgument( args ); - doPublish( cliArgs ); - } - catch(CmdLineException e) { - logger.error("#doPublish - failed to parse arguments.", e); - printUsage(parser, e); - return; - } - } - - public static void doPublish( CliArgs cliArgs ){ - try { - // parse the arguments. - DmaapPublishTool tool = new DmaapPublishTool( toPath(cliArgs.getYamlPath() , cliArgs.getYamlFilename()) , cliArgs.getNotificationData() ); - Collection<String> notifications = new ArrayList<String>( notificationBuffer ); - tool.addNotifications( notifications ); - notificationBuffer.removeAll(notifications); - Integer concurrentRequestCount = 1; - if ( StringUtils.isNotBlank( cliArgs.getConcurrentRequests() ) ) - concurrentRequestCount = Integer.parseInt( cliArgs.getConcurrentRequests() ); - requestManager = new RequestManager( concurrentRequestCount ); - - IntStream.range(0,concurrentRequestCount).forEach( it -> { - //region - report upon finish mechanishem - long ticket = System.nanoTime(); - registeredTasks.add( ticket ); - Consumer callback = ( uniqueTicket ) -> { - synchronized ( registeredTasks ){ - registeredTasks.remove( (long)uniqueTicket ); - registeredTasks.notifyAll(); - }}; - - RunnableReporter task = new RunnableReporter( ticket , tool , cliArgs , callback ); - requestManager.getExecutor().execute( task ) ; - }); - } - catch(NumberFormatException e) { - logger.error("#doPublish - failed to parse argument CR.", e); - return; - } - catch(Exception e) { - logger.error("#doPublish - failed to publish.", e); - } - } - - public static class RunnableReporter implements Runnable{ - - final private long ticket ; - final private DmaapPublishTool tool; - final private CliArgs cliArgs; - final Consumer reporter; - - public RunnableReporter(final long ticket , final DmaapPublishTool tool , final CliArgs args , Consumer reporter){ - this.ticket = ticket ; - this.tool = tool ; - this.cliArgs = args ; - this.reporter = reporter; - } - @Override - public void run() { - try { - tool.publish( cliArgs.getYamlPath() ); - reporter.accept(ticket); - }catch(IOException e){ - logger.error("#doPublish - failed to publish.", e); - }catch(InterruptedException e){ - logger.error("#doPublish - cannot complete publish, thread interuppted.", e); - Thread.currentThread().interrupt(); - } - } - } - - - public static List<Long> getRegisteredTasks() { - return registeredTasks; - } - - public static void preparePublish( String path, String filename , String concurrentRequests ){ - - CliArgs cliArgs = new CliArgs(); - if ( StringUtils.isNotBlank( filename ) ) - cliArgs.setYamlFilename( filename ); - if ( StringUtils.isNotBlank( path ) ) - cliArgs.setYamlPath( path ); - if ( NumberUtils.isCreatable( concurrentRequests ) ) - cliArgs.setConcurrentRequests( concurrentRequests ); - - doPublish( cliArgs ); - - } - - - private static void printUsage(CmdLineParser parser, CmdLineException e) { - System.err.println( e.getMessage() ); - System.err.println("java DmaapPublisher [options...] arguments..."); - // print the list of available options - parser.printUsage(System.err); - System.err.println(); - // print option sample. This is useful some time - System.err.println(" Example: java DmaapPublisher " + parser.printExample(OptionHandlerFilter.ALL)); - - } -} diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java deleted file mode 100644 index c30a397372..0000000000 --- a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java +++ /dev/null @@ -1,39 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * SDC - * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.sdc.dmaap; - -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - -public class RequestManager { - - private Executor executor; - - public RequestManager(int poolSize ){ - int sz = Math.max( poolSize , 1); - int recommendedMaxSz = Runtime.getRuntime().availableProcessors() * 2; - executor = Executors.newFixedThreadPool( Math.min( sz , recommendedMaxSz ) ); - } - - public Executor getExecutor() { - return executor; - } -} diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java deleted file mode 100644 index c76a0f57b0..0000000000 --- a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java +++ /dev/null @@ -1,72 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * SDC - * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.sdc.dmaap; - -import com.google.common.base.MoreObjects; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -public class TopicConfig { - - private String publisherPropertiesFilePath; - private String[] topicMessages; //messages from file - private final List<String> incomingTopicMessages = Collections.synchronizedList( new ArrayList<String>() ); //incoming messages from network stream|Main - - public String getPublisherPropertiesFilePath() { - return publisherPropertiesFilePath; - } - public void setPublisherPropertiesFilePath(String publisherPropertiesFilePath) { - this.publisherPropertiesFilePath = publisherPropertiesFilePath; - } - - public List<String> getIncomingTopicMessages() { - return incomingTopicMessages; - } - public String[] getTopicMessages() { - return topicMessages; - } - //add incoming message - public TopicConfig add( String notifications ){ - incomingTopicMessages.add( notifications); - return this; - } - - public TopicConfig addAll( Collection<String> notifications ){ - incomingTopicMessages.addAll( notifications ); - return this; - } - - public void setTopicMessages(String[] topicMessages) { - this.topicMessages = topicMessages; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("publisherPropertiesFilePath", publisherPropertiesFilePath) - .add("topicMessages", topicMessages) - .toString(); - } - -} diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java deleted file mode 100644 index cbc6198172..0000000000 --- a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java +++ /dev/null @@ -1,39 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * SDC - * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.sdc.dmaap; - -import org.apache.commons.lang3.StringUtils; - -import java.nio.file.InvalidPathException; - -public class Util { - - public static String toPath(String path , String filename) throws InvalidPathException{ - if (StringUtils.isNotBlank(path) ){ - if (path.trim().endsWith("/") || path.trim().endsWith("/")){ - return path+(filename!=null ? filename : ""); - } - return path+"/"+(filename!=null ? filename : ""); - - } - throw new InvalidPathException("wrong path configuration cannot find path -> ",path); - } -} |