From 0beea783cef8a84d8bc2655ea678e00d459cc831 Mon Sep 17 00:00:00 2001 From: efiacor Date: Fri, 23 Jul 2021 19:15:53 +0100 Subject: [DMAAP-MR] Remove redundant data Signed-off-by: efiacor Change-Id: I56a8417f72d892705230e94f079db3024170e884 Issue-ID: DMAAP-1638 --- .../mr/cambria/embed/EmbedConfigurationReader.java | 3 +- .../onap/dmaap/mr/cambria/embed/KafkaLocal.java | 17 +++++-- .../dmaap/mr/cambria/embed/ZooKeeperLocal.java | 55 ++++++++++++---------- 3 files changed, 44 insertions(+), 31 deletions(-) (limited to 'src/test/java/org') diff --git a/src/test/java/org/onap/dmaap/mr/cambria/embed/EmbedConfigurationReader.java b/src/test/java/org/onap/dmaap/mr/cambria/embed/EmbedConfigurationReader.java index 3e3fd28..f49f615 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/embed/EmbedConfigurationReader.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/embed/EmbedConfigurationReader.java @@ -3,6 +3,7 @@ * org.onap.dmaap * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Modification copyright (C) 2021 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,7 +96,7 @@ public class EmbedConfigurationReader { final Properties props = new Properties (); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); - props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret'"); + props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); fKafkaAdminClient = AdminClient.create ( props ); diff --git a/src/test/java/org/onap/dmaap/mr/cambria/embed/KafkaLocal.java b/src/test/java/org/onap/dmaap/mr/cambria/embed/KafkaLocal.java index 9f3c05a..17c5bbb 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/embed/KafkaLocal.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/embed/KafkaLocal.java @@ -3,6 +3,7 @@ * ONAP Policy Engine * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Modification copyright (C) 2021 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,16 +21,16 @@ package org.onap.dmaap.mr.cambria.embed; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; - import java.io.IOException; import java.util.Properties; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import org.apache.kafka.common.utils.Time; public class KafkaLocal { - public KafkaServerStartable kafka; + public KafkaServer kafka; public ZooKeeperLocal zookeeper; public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws IOException, InterruptedException{ @@ -38,10 +39,12 @@ public class KafkaLocal { //start local zookeeper System.out.println("starting local zookeeper..."); zookeeper = new ZooKeeperLocal(zkProperties); + zookeeper.run(); System.out.println("done"); //start local kafka broker - kafka = new KafkaServerStartable(kafkaConfig); + final scala.Option prefix = scala.Option.apply("kafka"); + kafka = new KafkaServer(kafkaConfig, Time.SYSTEM, prefix, false); System.out.println("starting local kafka broker..."); kafka.startup(); System.out.println("done"); @@ -52,6 +55,10 @@ public class KafkaLocal { //stop kafka broker System.out.println("stopping kafka..."); kafka.shutdown(); + kafka.awaitShutdown(); + System.out.println("done"); + System.out.println("stopping zookeeper..."); + zookeeper.stop(); System.out.println("done"); } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/embed/ZooKeeperLocal.java b/src/test/java/org/onap/dmaap/mr/cambria/embed/ZooKeeperLocal.java index 97447a8..94939c7 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/embed/ZooKeeperLocal.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/embed/ZooKeeperLocal.java @@ -3,6 +3,7 @@ * ONAP Policy Engine * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Modification copyright (C) 2021 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,41 +21,45 @@ package org.onap.dmaap.mr.cambria.embed; +import java.util.Properties; import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZooKeeperServerMain; -import org.apache.zookeeper.server.admin.AdminServer.AdminServerException; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Properties; - public class ZooKeeperLocal { - - ZooKeeperServerMain zooKeeperServer; - - public ZooKeeperLocal(Properties zkProperties) throws FileNotFoundException, IOException{ + + ZooKeeperServerMain testingZooKeeperMain = null; + ServerConfig conf; + Thread t1; + + public ZooKeeperLocal(Properties zkProperties) { QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig(); try { quorumConfiguration.parseProperties(zkProperties); } catch(Exception e) { throw new RuntimeException(e); } - - zooKeeperServer = new ZooKeeperServerMain(); - final ServerConfig configuration = new ServerConfig(); - configuration.readFrom(quorumConfiguration); - - - new Thread() { - public void run() { - try { - zooKeeperServer.runFromConfig(configuration); - } catch (IOException | AdminServerException e) { - System.out.println("ZooKeeper Failed"); - e.printStackTrace(System.err); - } - } - }.start(); + conf = new ServerConfig(); + conf.readFrom(quorumConfiguration); } + + public void run() { + if (testingZooKeeperMain == null){ + t1 = new Thread(() -> { + try { + testingZooKeeperMain = new ZooKeeperServerMain(); + testingZooKeeperMain.runFromConfig(conf); + } catch (Exception e) { + System.out.println("Start of Local ZooKeeper Failed"); + e.printStackTrace(System.err); + } + }); + t1.start(); + }} + + public void stop() { + testingZooKeeperMain.close(); + t1.stop(); + } + } -- cgit 1.2.3-korg