From 2a2b5d085876480c1b0d9470a57c6cab4f51008c Mon Sep 17 00:00:00 2001 From: Sirisha_Manchikanti Date: Fri, 1 Jul 2022 07:15:00 +0100 Subject: Introduce Custom Kafka End point Issue-ID: POLICY-4133 Signed-off-by: Sirisha_Manchikanti Change-Id: I2745f3af97e9bb83d94c5cb6d29dfd452d315506 --- .../event/comm/bus/KafkaTopicFactoryTestBase.java | 48 +++++++ .../event/comm/bus/KafkaTopicPropertyBuilder.java | 75 ++++++++++ .../event/comm/bus/KafkaTopicSinkTest.java | 34 +++++ .../comm/bus/KafkaTopicSourceFactoryTest.java | 158 +++++++++++++++++++++ .../event/comm/bus/KafkaTopicSourceTest.java | 34 +++++ .../event/comm/bus/internal/BusConsumerTest.java | 12 ++ .../SingleThreadedKafkaTopicSourceTest.java | 63 ++++++++ 7 files changed, 424 insertions(+) create mode 100644 policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java create mode 100644 policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java create mode 100644 policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java create mode 100644 policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java create mode 100644 policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java create mode 100644 policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java (limited to 'policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event') diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java new file mode 100644 index 00000000..3986549c --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; + +import java.util.Collections; +import org.onap.policy.common.endpoints.event.comm.Topic; + +/** + * Base class for KafkaTopicXxxFactory tests. + * + * @param type of topic managed by the factory + */ +public abstract class KafkaTopicFactoryTestBase extends BusTopicFactoryTestBase { + + @Override + public void testBuildBusTopicParams_Ex() { + + super.testBuildBusTopicParams_Ex(); + + // null servers + assertThatIllegalArgumentException().as("null servers") + .isThrownBy(() -> buildTopic(makeBuilder().servers(null).build())); + + // empty servers + assertThatIllegalArgumentException().as("empty servers") + .isThrownBy(() -> buildTopic(makeBuilder().servers(Collections.emptyList()).build())); + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java new file mode 100644 index 00000000..1a815e1a --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java @@ -0,0 +1,75 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_EFFECTIVE_TOPIC; +import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_PARTITION; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX; + +import java.util.Arrays; +import lombok.Getter; +import org.onap.policy.common.endpoints.parameters.TopicParameters; + +public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder { + + public static final String SERVER = "my-server"; + public static final String TOPIC2 = "my-topic-2"; + + @Getter + private TopicParameters params = new TopicParameters(); + + /** + * Constructs the object. + * + * @param prefix the prefix for the properties to be built + */ + public KafkaTopicPropertyBuilder(String prefix) { + super(prefix); + } + + /** + * Adds a topic and configures it's properties with default values. + * + * @param topic the topic to be added + * @return this builder + */ + public KafkaTopicPropertyBuilder makeTopic(String topic) { + addTopic(topic); + + setTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, MY_EFFECTIVE_TOPIC); + setTopicProperty(PROPERTY_MANAGED_SUFFIX, "true"); + setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true"); + setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION); + setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); + + params.setTopicCommInfrastructure("kafka"); + params.setTopic(topic); + params.setEffectiveTopic(MY_EFFECTIVE_TOPIC); + params.setManaged(true); + params.setUseHttps(true); + params.setPartitionId(MY_PARTITION); + params.setServers(Arrays.asList(SERVER)); + + return this; + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java new file mode 100644 index 00000000..503e5131 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertNotNull; + +import org.junit.Test; + +public class KafkaTopicSinkTest { + + @Test + public void test() { + assertNotNull(KafkaTopicFactories.getSinkFactory()); + } + +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java new file mode 100644 index 00000000..6fa80a41 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java @@ -0,0 +1,158 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS; + +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; + +public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase { + + private SourceFactory factory; + + /** + * Creates the object to be tested. + */ + @Before + @Override + public void setUp() { + super.setUp(); + + factory = new SourceFactory(); + } + + @After + public void tearDown() { + factory.destroy(); + } + + @Test + @Override + public void testBuildBusTopicParams() { + super.testBuildBusTopicParams_Ex(); + } + + @Test + @Override + public void testBuildProperties() { + + initFactory(); + + List topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()); + assertEquals(1, topics.size()); + assertEquals(MY_TOPIC, topics.get(0).getTopic()); + } + + @Test + @Override + public void testDestroyString_testGet_testInventory() { + super.testDestroyString_Ex(); + } + + @Test + public void testGet() { + super.testGet_Ex(); + } + + @Test + public void testToString() { + assertTrue(factory.toString().startsWith("IndexedKafkaTopicSourceFactory [")); + } + + @Override + protected void initFactory() { + if (factory != null) { + factory.destroy(); + } + + factory = new SourceFactory(); + } + + @Override + protected List buildTopics(Properties properties) { + return factory.build(properties); + } + + @Override + protected KafkaTopicSource buildTopic(BusTopicParams params) { + return factory.build(params); + } + + @Override + protected KafkaTopicSource buildTopic(List servers, String topic) { + return factory.build(servers, topic); + } + + @Override + protected void destroyFactory() { + factory.destroy(); + } + + @Override + protected void destroyTopic(String topic) { + factory.destroy(topic); + } + + @Override + protected List getInventory() { + return factory.inventory(); + } + + @Override + protected KafkaTopicSource getTopic(String topic) { + return factory.get(topic); + } + + @Override + protected BusTopicParams getLastParams() { + return factory.params.getLast(); + } + + @Override + protected TopicPropertyBuilder makePropBuilder() { + return new KafkaTopicPropertyBuilder(PROPERTY_KAFKA_SOURCE_TOPICS); + } + + /** + * Factory that records the parameters of all of the sources it creates. + */ + private static class SourceFactory extends IndexedKafkaTopicSourceFactory { + private Deque params = new LinkedList<>(); + + @Override + protected KafkaTopicSource makeSource(BusTopicParams busTopicParams) { + params.add(busTopicParams); + return super.makeSource(busTopicParams); + } + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java new file mode 100644 index 00000000..ee2d1d7b --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertNotNull; + +import org.junit.Test; + +public class KafkaTopicSourceTest { + + @Test + public void verifyKafkaTopicFactoriesNotNull() { + assertNotNull(KafkaTopicFactories.getSourceFactory()); + } + +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java index 21050f97..da9f792b 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java @@ -46,6 +46,7 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.Dmaa import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.powermock.reflect.Whitebox; @@ -295,6 +296,17 @@ public class BusConsumerTest extends TopicTestBase { new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()); } + @Test + public void testKafkaConsumerWrapper() throws Exception { + // verify that different wrappers can be built + assertThatCode(() -> new KafkaConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException(); + } + + @Test + public void testKafkaConsumerWrapperToString() throws Exception { + assertNotNull(new KafkaConsumerWrapper(makeBuilder().build()) {}.toString()); + } + private static class FetchingBusConsumerImpl extends FetchingBusConsumer { protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) { diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java new file mode 100644 index 00000000..cc096585 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; +import org.onap.policy.common.utils.gson.GsonTestUtils; + +public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase { + private SingleThreadedKafkaTopicSource source; + + /** + * Creates the object to be tested. + */ + @Before + @Override + public void setUp() { + super.setUp(); + + source = new SingleThreadedKafkaTopicSource(makeBuilder().build()); + } + + @After + public void tearDown() { + source.shutdown(); + } + + @Test + public void testToString() { + assertTrue(source.toString().startsWith("SingleThreadedKafkaTopicSource [")); + } + + @Test + public void testGetTopicCommInfrastructure() { + assertEquals(CommInfrastructure.KAFKA, source.getTopicCommInfrastructure()); + } + +} -- cgit 1.2.3-korg