diff options
-rw-r--r-- | pom.xml | 57 | ||||
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java | 60 | ||||
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java | 55 | ||||
-rw-r--r-- | version.properties | 2 |
4 files changed, 103 insertions, 71 deletions
@@ -14,7 +14,7 @@ <modelVersion>4.0.0</modelVersion> <groupId>org.onap.dmaap.messagerouter.msgrtr</groupId> <artifactId>msgrtr</artifactId> - <version>1.1.17-SNAPSHOT</version> + <version>1.1.18-SNAPSHOT</version> <packaging>jar</packaging> <name>dmaap-messagerouter-msgrtr</name> <description>Message Router - Restful interface built for kafka</description> @@ -22,7 +22,7 @@ <parent> <groupId>org.onap.oparent</groupId> <artifactId>oparent</artifactId> - <version>1.2.1</version> + <version>2.0.0</version> </parent> <properties> @@ -484,6 +484,59 @@ <!-- <phase>package</phase> bind to the packaging phase <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> --> <!-- --> + <plugin> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <dependencies> + <dependency> + <groupId>org.onap.oparent</groupId> + <artifactId>checkstyle</artifactId> + <version>2.0.0</version> + </dependency> + </dependencies> + <executions> + <execution> + <id>onap-license</id> + <goals> + <goal>check</goal> + </goals> + <phase>process-sources</phase> + <configuration> + <configLocation>onap-checkstyle/check-license.xml</configLocation> + <includeResources>false</includeResources> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <includeTestResources>false</includeTestResources> + <sourceDirectory>${project.build.sourceDirectory}</sourceDirectory> + <excludes> + </excludes> + <consoleOutput>true</consoleOutput> + <failsOnViolation>false</failsOnViolation> + </configuration> + </execution> + <execution> + <id>onap-java-style</id> + <goals> + <goal>check</goal> + </goals> + <phase>none</phase> + <configuration> + <!-- Use Google Java Style Guide: + https://github.com/checkstyle/checkstyle/blob/master/src/main/resources/google_checks.xml + with minor changes --> + <configLocation>onap-checkstyle/onap-java-style.xml</configLocation> + <!-- <sourceDirectory> is needed so that checkstyle ignores the generated sources directory --> + <sourceDirectory>${project.build.sourceDirectory}</sourceDirectory> + <includeResources>true</includeResources> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <includeTestResources>true</includeTestResources> + <excludes> + </excludes> + <consoleOutput>true</consoleOutput> + <failsOnViolation>false</failsOnViolation> + </configuration> + </execution> + </executions> + </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-site-plugin</artifactId> diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java index 4bdd9f3..5f616c7 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java @@ -26,21 +26,19 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Properties; - import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.json.JSONException; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.util.StringUtils; import org.onap.dmaap.dmf.mr.backends.Publisher; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; import org.onap.dmaap.dmf.mr.utils.Utils; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.util.StringUtils; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable; +import kafka.common.FailedToSendMessageException; @@ -84,7 +82,7 @@ public class KafkaPublisher implements Publisher { props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); @@ -100,7 +98,7 @@ public class KafkaPublisher implements Publisher { */ @Override public void sendMessage(String topic, message msg) throws IOException{ - final List<message> msgs = new LinkedList<message>(); + final List<message> msgs = new LinkedList<>(); msgs.add(msg); sendMessages(topic, msgs); } @@ -168,29 +166,18 @@ public class KafkaPublisher implements Publisher { } } */ @Override - public void sendMessagesNew(String topic, List<? extends message> msgs) - throws IOException { - log.info("sending " + msgs.size() + " events to [" + topic + "]"); -try{ - final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size()); - for (message o : msgs) { - - final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString()); - - - try { - - fProducer.send(data); - - } catch (Exception excp) { - log.error("Failed to send message(s) to topic [" + topic + "].", excp); - throw new Exception(excp.getMessage(), excp); - } - } - - }catch(Exception e){} -} - //private final rrNvReadable fSettings; + public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException { + log.info("sending " + msgs.size() + " events to [" + topic + "]"); + try { + for (message o : msgs) { + final ProducerRecord<String, String> data = + new ProducerRecord<>(topic, o.getKey(), o.toString()); + fProducer.send(data); + } + } catch (Exception e) { + log.error("Failed to send message(s) to topic [" + topic + "].", e); + } + } private Producer<String, String> fProducer; @@ -203,14 +190,11 @@ try{ * @param defVal */ private void transferSetting(Properties props, String key, String defVal) { - String kafka_prop= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key); - if (StringUtils.isEmpty(kafka_prop)) kafka_prop=defVal; - //props.put(key, settings.getString("kafka." + key, defVal)); - props.put(key, kafka_prop); + String kafkaProp= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key); + if (StringUtils.isEmpty(kafkaProp)) kafkaProp=defVal; + props.put(key, kafkaProp); } - //private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class); - private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaPublisher.class); @Override @@ -218,6 +202,4 @@ try{ // TODO Auto-generated method stub } - - -}
\ No newline at end of file +} diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 03a1bd5..d7fa28b 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -222,28 +222,23 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } // create via kafka - - try - { - final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue() ); - final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) ); - final KafkaFuture<Void> ctrResult = ctr.all (); - ctrResult.get (); - // underlying Kafka topic created. now setup our API info - return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled ); - } - catch ( InterruptedException e ) - { - - log.warn ( "Execution of describeTopics timed out." ); - throw new ConfigDbException ( e ); - } - catch ( ExecutionException e ) - { - - log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () ); - throw new ConfigDbException ( e.getCause () ); - } + + try { + final NewTopic topicRequest = + new NewTopic(topic, partitions, (short)replicas); + final CreateTopicsResult ctr = + fKafkaAdminClient.createTopics(Arrays.asList(topicRequest)); + final KafkaFuture<Void> ctrResult = ctr.all(); + ctrResult.get(); + // underlying Kafka topic created. now setup our API info + return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled); + } catch (InterruptedException e) { + log.warn("Execution of describeTopics timed out."); + throw new ConfigDbException(e); + } catch (ExecutionException e) { + log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e); + throw new ConfigDbException(e.getCause()); + } } @@ -348,11 +343,17 @@ public class DMaaPKafkaMetaBroker implements Broker1 { // owner (or it's empty), null is okay -- this is for existing or implicitly // created topics. JSONObject readers = o.optJSONObject ( "readers" ); - if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl; + if ( readers == null && fOwner.length () > 0 ) + { + readers = kEmptyAcl; + } fReaders = fromJson ( readers ); JSONObject writers = o.optJSONObject ( "writers" ); - if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl; + if ( writers == null && fOwner.length () > 0 ) + { + writers = kEmptyAcl; + } fWriters = fromJson ( writers ); } @@ -445,11 +446,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName ); } - catch ( ConfigDbException x ) - { - throw x; - } - catch ( AccessDeniedException x ) + catch ( ConfigDbException | AccessDeniedException x ) { throw x; } diff --git a/version.properties b/version.properties index 8d21b0b..dba8f85 100644 --- a/version.properties +++ b/version.properties @@ -27,7 +27,7 @@ major=1 minor=1 -patch=17 +patch=18 base_version=${major}.${minor}.${patch} |