diff options
author | Gurjeet Bedi <gurjeetb@amdocs.com> | 2018-06-14 12:20:23 -0400 |
---|---|---|
committer | Gurjeet Bedi <gurjeetb@amdocs.com> | 2018-06-14 12:23:01 -0400 |
commit | 82f4f1ccada97fa394f88cf208ce526ff5ebf751 (patch) | |
tree | 8c00238ce92f2eea8990580f7de0a4dd7acbe540 /src | |
parent | 9f43c23e7fbb4531fd045d081ddb59b5a547833c (diff) |
Make generic router core implementations
Remove dependency of implementation classes of event client
Issue-ID: AAI-1228
Change-Id: I464439e225de6b32941257c090932bd2f6066dc8
Signed-off-by: Gurjeet Bedi <gurjeetb@amdocs.com>
Diffstat (limited to 'src')
13 files changed, 218 insertions, 501 deletions
diff --git a/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java b/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java index abbfe63..9a5abee 100644 --- a/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java +++ b/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java @@ -24,18 +24,12 @@ import org.apache.camel.Component; import org.apache.camel.impl.DefaultEndpoint; public abstract class AbstractEventBusEndpoint extends DefaultEndpoint { - public AbstractEventBusEndpoint() { - } public AbstractEventBusEndpoint(String endpointUri, Component component) { super(endpointUri, component); } - public AbstractEventBusEndpoint(String endpointUri) { - super(endpointUri); - } - - abstract void close(); + abstract void close() throws Exception; abstract int getPollingDelay(); abstract int getPoolSize(); abstract String getEventTopic(); diff --git a/src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java b/src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java deleted file mode 100644 index 696e0f4..0000000 --- a/src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.event; - -import org.apache.camel.CamelContext; -import org.apache.camel.Endpoint; -import org.apache.camel.impl.UriEndpointComponent; - -import java.util.Map; - -/** - * Represents the component that manages {@link DMaaPEventBusEndpoint}. - */ -public class DMaaPEventBusComponent extends UriEndpointComponent { - - public DMaaPEventBusComponent() { - super(DMaaPEventBusEndpoint.class); - } - - public DMaaPEventBusComponent(CamelContext context) { - super(context, DMaaPEventBusEndpoint.class); - } - - @Override - protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) - throws Exception { - Endpoint endpoint = new DMaaPEventBusEndpoint(uri, this); - setProperties(endpoint, parameters); - return endpoint; - } -} diff --git a/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java b/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java deleted file mode 100644 index 3cc37e8..0000000 --- a/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java +++ /dev/null @@ -1,184 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.event; - - -import org.apache.camel.Consumer; -import org.apache.camel.Processor; -import org.apache.camel.Producer; -import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.UriEndpoint; -import org.apache.camel.spi.UriParam; -import org.apache.camel.spi.UriPath; -import org.eclipse.jetty.util.security.Password; -import org.onap.aai.event.client.DMaaPEventConsumer; - -/** - * Represents a EventBus endpoint. - */ -@UriEndpoint(scheme = "dmaap-event-bus", syntax = "dmaap-event-bus:name", - consumerClass = EventBusConsumer.class, title = "dmaap-event-bus") -public class DMaaPEventBusEndpoint extends AbstractEventBusEndpoint { - @UriPath - @Metadata(required = "true") - private String name; - - @UriParam(label = "eventTopic") - @Metadata(required = "true") - private String eventTopic; - @UriParam(label = "consumerGroup") - @Metadata(required = "true") - private String consumerGroup; - @UriParam(label = "consumerId") - @Metadata(required = "true") - private String consumerId; - @UriParam(label = "username") - private String username; - @UriParam(label = "password") - private String password; - @UriParam(label = "url") - @Metadata(required = "true") - private String url; - @UriParam(label = "poolSize") - @Metadata(required = "true", defaultValue="20") - private int poolSize = 20; - @UriParam(label = "pollingDelay") - @Metadata(required = "true", defaultValue="30000") - private int pollingDelay = 30000; - @UriParam(label = "transportType") - @Metadata(required = "true", defaultValue="HTTPAUTH") - private String transportType = "HTTPAUTH"; - - private DMaaPEventConsumer dmaapConsumer; - - public DMaaPEventBusEndpoint() {} - - public DMaaPEventBusEndpoint(String uri, DMaaPEventBusComponent component) { - super(uri, component); - } - - public DMaaPEventBusEndpoint(String endpointUri) { - super(endpointUri); - } - - @Override - void close() { - // Don't have to do anything for DMaaP - } - - @Override - public Producer createProducer() throws Exception { - return new EventBusProducer(this); - } - @Override - public Consumer createConsumer(Processor processor) throws Exception { - // TODO: other overloads based on filled-in properties - dmaapConsumer = new DMaaPEventConsumer(url, eventTopic, username, Password.deobfuscate(password), consumerGroup, consumerId, 15000, 1000, transportType); - return new EventBusConsumer(this, processor, dmaapConsumer); - } - @Override - public boolean isSingleton() { - return false; - } - - public void setName(String name) { - this.name = name; - } - - public String getName() { - return name; - } - - public String getEventTopic() { - return eventTopic; - } - - public void setEventTopic(String eventTopic) { - this.eventTopic = eventTopic; - } - - public String getConsumerGroup() { - return consumerGroup; - } - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } - - public String getConsumerId() { - return consumerId; - } - - public void setConsumerId(String consumerId) { - this.consumerId = consumerId; - } - - public String getUsername() { - return username == null ? null : Password.deobfuscate(username); - //return username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return password == null ? null : Password.deobfuscate(password); - //return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public int getPoolSize() { - return poolSize; - } - - public void setPoolSize(int poolsize) { - this.poolSize = poolsize; - } - - public int getPollingDelay() { - return pollingDelay; - } - - public void setPollingDelay(int pollingDelay) { - this.pollingDelay = pollingDelay; - } - - public String getUrl() { - return url; - } - - public void setUrl(String url) { - this.url = url; - } - - public String getTransportType() { - return transportType; - } - - public void setTransportType(String transportType) { - this.transportType = transportType; - } -} - diff --git a/src/test/java/org/onap/aai/event/TestProcessor.java b/src/main/java/org/onap/aai/event/EventBusComponent.java index e0f2d27..766382b 100644 --- a/src/test/java/org/onap/aai/event/TestProcessor.java +++ b/src/main/java/org/onap/aai/event/EventBusComponent.java @@ -2,8 +2,8 @@ * ============LICENSE_START======================================================= * org.onap.aai * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs + * Copyright © 2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2018 Amdocs * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,18 +18,26 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.onap.aai.event; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; - -public class TestProcessor implements Processor { - - @Override - public void process(Exchange exchange) throws Exception { - // TODO Auto-generated method stub +import java.util.Map; - } +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.DefaultComponent; +public class EventBusComponent extends DefaultComponent { + public EventBusComponent() { + super(); + } + public EventBusComponent(CamelContext context) { + super(context); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + Endpoint endpoint = new EventBusEndPoint(uri, this); + setProperties(endpoint, parameters); + return endpoint; + } } diff --git a/src/main/java/org/onap/aai/event/EventBusConsumer.java b/src/main/java/org/onap/aai/event/EventBusConsumer.java index da8ffb5..b189cfd 100644 --- a/src/main/java/org/onap/aai/event/EventBusConsumer.java +++ b/src/main/java/org/onap/aai/event/EventBusConsumer.java @@ -31,7 +31,6 @@ import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; -import java.util.Collections; import java.util.UUID; import java.util.concurrent.ScheduledThreadPoolExecutor; diff --git a/src/main/java/org/onap/aai/event/EventBusEndPoint.java b/src/main/java/org/onap/aai/event/EventBusEndPoint.java new file mode 100644 index 0000000..621b30f --- /dev/null +++ b/src/main/java/org/onap/aai/event/EventBusEndPoint.java @@ -0,0 +1,108 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.event; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.event.api.EventConsumer; +import org.onap.aai.event.api.EventPublisher; + +@UriEndpoint(scheme = "event-bus", syntax = "event-bus:name", +consumerClass = EventBusConsumer.class, title = "event-bus") +public class EventBusEndPoint extends AbstractEventBusEndpoint { + @UriParam(label = "eventTopic") + @Metadata(required = "true") + private String eventTopic; + @UriParam(label = "poolSize") + @Metadata(required = "true", defaultValue="20") + private int poolSize = 20; + @UriParam(label = "pollingDelay") + @Metadata(required = "true", defaultValue="30000") + private int pollingDelay = 30000; + + EventConsumer consumer; //This would be injected via bean through camel route when passed with # + + EventPublisher publisher; //This would be injected via bean through camel route when passed with # + + private Logger logger = LoggerFactory.getInstance().getLogger(EventBusEndPoint.class); + + public EventBusEndPoint(String uri, EventBusComponent component) { + super(uri, component); + } + + @Override + public Producer createProducer() throws Exception { + return new EventBusProducer(this, publisher); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new EventBusConsumer(this, processor, consumer); + } + + @Override + public boolean isSingleton() { + return false; + } + + void close() throws Exception { + if(consumer != null) + consumer.close(); + if(publisher != null) + publisher.close(); + } + + public void setPoolSize(int poolSize) { + this.poolSize = poolSize; + } + + public void setPollingDelay(int pollingDelay) { + this.pollingDelay = pollingDelay; + } + + public int getPollingDelay() { + return pollingDelay; + } + public int getPoolSize() { + return poolSize; + } + public String getEventTopic() { + return eventTopic; + } + + public void setEventTopic(String eventTopic) { + this.eventTopic = eventTopic; + } + + public void setConsumer(EventConsumer consumer) { + this.consumer = consumer; + } + + public void setPublisher(EventPublisher publisher) { + this.publisher = publisher; + } +} diff --git a/src/main/java/org/onap/aai/event/EventBusProducer.java b/src/main/java/org/onap/aai/event/EventBusProducer.java index dd8dbed..dfd1bfe 100644 --- a/src/main/java/org/onap/aai/event/EventBusProducer.java +++ b/src/main/java/org/onap/aai/event/EventBusProducer.java @@ -22,17 +22,22 @@ package org.onap.aai.event; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; +import org.onap.aai.event.api.EventPublisher; /** * The EventBus producer. */ public class EventBusProducer extends DefaultProducer { private AbstractEventBusEndpoint endpoint; + + private EventPublisher publisher; - public EventBusProducer(AbstractEventBusEndpoint endpoint) { - super(endpoint); - this.endpoint = endpoint; + public EventBusProducer(AbstractEventBusEndpoint endpoint, EventPublisher publisher) { + super(endpoint); + this.endpoint = endpoint; + this.publisher = publisher; } + @Override public void process(Exchange exchange) throws Exception { // Publishing to event bus is currently not supported diff --git a/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java b/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java deleted file mode 100644 index 07b4c82..0000000 --- a/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.event; - -import org.apache.camel.CamelContext; -import org.apache.camel.Endpoint; -import org.apache.camel.impl.UriEndpointComponent; - -import java.util.Map; - -/** - * Represents the component that manages {@link KafkaEventBusEndpoint}. - */ -public class KafkaEventBusComponent extends UriEndpointComponent { - - public KafkaEventBusComponent() { - super(KafkaEventBusEndpoint.class); - } - - public KafkaEventBusComponent(CamelContext context) { - super(context, KafkaEventBusEndpoint.class); - } - - @Override - protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) - throws Exception { - Endpoint endpoint = new KafkaEventBusEndpoint(uri, this); - setProperties(endpoint, parameters); - return endpoint; - } -} diff --git a/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java b/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java deleted file mode 100644 index 4194c89..0000000 --- a/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright © 2017-2018 Amdocs - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.aai.event; - -import org.apache.camel.Consumer; -import org.apache.camel.Processor; -import org.apache.camel.Producer; -import org.apache.camel.impl.DefaultEndpoint; -import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.UriEndpoint; -import org.apache.camel.spi.UriParam; -import org.onap.aai.event.client.KafkaEventConsumer; - -/** - * Represents a EventBus endpoint. - */ -@UriEndpoint(scheme = "kafka-event-bus", syntax = "kafka-event-bus:name", - consumerClass = EventBusConsumer.class, title = "kafka-event-bus") -public class KafkaEventBusEndpoint extends AbstractEventBusEndpoint { - @UriParam(label = "url") - @Metadata(required = "true") - private String url; - @UriParam(label = "eventTopic") - @Metadata(required = "true") - private String eventTopic; - @UriParam(label = "consumerGroup") - @Metadata(required = "true") - private String consumerGroup; - @UriParam(label = "poolSize") - @Metadata(required = "true", defaultValue="20") - private int poolSize = 20; - @UriParam(label = "pollingDelay") - @Metadata(required = "true", defaultValue="30000") - private int pollingDelay = 30000; - - private KafkaEventConsumer consumer; - - public KafkaEventBusEndpoint(String uri, KafkaEventBusComponent component) { - super(uri, component); - } - - @Override - public Producer createProducer() throws Exception { - return new EventBusProducer(this); - } - - @Override - public Consumer createConsumer(Processor processor) throws Exception { - consumer = new KafkaEventConsumer(url, eventTopic, consumerGroup); - return new EventBusConsumer(this, processor, consumer); - } - - @Override - public boolean isSingleton() { - return false; - } - - public String getUrl() { - return url; - } - - public void setUrl(String url) { - this.url = url; - } - - @Override - String getEventTopic() { - return eventTopic; - } - - public void setEventTopic(String eventTopic) { - this.eventTopic = eventTopic; - } - - public String getConsumerGroup() { - return consumerGroup; - } - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } - - @Override - int getPoolSize() { - return poolSize; - } - - public void setPoolSize(int poolSize) { - this.poolSize = poolSize; - } - - @Override - void close() { - consumer.close(); - } - - @Override - int getPollingDelay() { - return pollingDelay; - } - - public void setPollingDelay(int pollingDelay) { - this.pollingDelay = pollingDelay; - } - - - - - - -} diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus deleted file mode 100644 index f1fee02..0000000 --- a/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus +++ /dev/null @@ -1 +0,0 @@ -class=org.onap.aai.event.DMaaPEventBusComponent diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/event-bus new file mode 100644 index 0000000..f795067 --- /dev/null +++ b/src/main/resources/META-INF/services/org/apache/camel/component/event-bus @@ -0,0 +1 @@ +class=org.onap.aai.event.EventBusComponent diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus deleted file mode 100644 index 89f243d..0000000 --- a/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus +++ /dev/null @@ -1 +0,0 @@ -class=org.onap.aai.event.KafkaEventBusComponent diff --git a/src/test/java/org/onap/aai/event/EventBusTest.java b/src/test/java/org/onap/aai/event/EventBusTest.java index 1add1c0..c7cd527 100644 --- a/src/test/java/org/onap/aai/event/EventBusTest.java +++ b/src/test/java/org/onap/aai/event/EventBusTest.java @@ -23,17 +23,49 @@ package org.onap.aai.event; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.io.PrintWriter; -import java.io.StringWriter; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; +import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.impl.DefaultMessage; +import org.apache.camel.impl.MessageSupport; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.onap.aai.event.api.EventConsumer; +import org.onap.aai.event.api.EventPublisher; +import com.att.aft.dme2.hazelcast.core.Message; + +@RunWith(MockitoJUnitRunner.class) public class EventBusTest { + @Mock + public EventConsumer consumer; + + @Mock + public EventPublisher publisher; + + @Mock + public CamelContext context; + + @Mock + public Processor processor; - /** + @Mock + Exchange exchange; + + @Mock + AbstractEventBusEndpoint endPoint; + + /** * Test case initialization * * @throws Exception the exception @@ -44,77 +76,60 @@ public class EventBusTest { @Test public void validateProducer() throws Exception { - try { - DMaaPEventBusComponent rc = new DMaaPEventBusComponent(); - DMaaPEventBusEndpoint endpoint = new DMaaPEventBusEndpoint("http://host.com:8443/endpoint", rc); - endpoint.setPassword("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); - endpoint.setUsername("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); - endpoint.setEventTopic("eventTopic"); - endpoint.setConsumerId("groupId"); - endpoint.setConsumerGroup("gn"); - endpoint.setName("name"); - endpoint.setPoolSize(45); - endpoint.setPollingDelay(10); - endpoint.setUrl("url"); - - assertTrue(endpoint.getPassword().compareTo("onapSecret") == 0); - assertTrue(endpoint.getUsername().compareTo("onapSecret") == 0); - assertTrue(endpoint.getEventTopic().compareTo("eventTopic") == 0); - assertTrue(endpoint.getConsumerId().compareTo("groupId") == 0); - assertTrue(endpoint.getConsumerGroup().compareTo("gn") == 0); - assertTrue(endpoint.getName().compareTo("name") == 0); - assertTrue(endpoint.getPoolSize() == 45); - assertTrue(endpoint.getPollingDelay() == 10); - assertTrue(endpoint.getUrl().compareTo("url") == 0); - assertFalse(endpoint.isSingleton()); - - EventBusProducer producer = (EventBusProducer)endpoint.createProducer(); - assertTrue(producer.getEndpoint() != null); - } - catch (Exception ex) { - StringWriter writer = new StringWriter(); - PrintWriter printWriter = new PrintWriter( writer ); - ex.printStackTrace( printWriter ); - printWriter.flush(); - System.out.println(writer.toString()); - throw ex; - } + EventBusComponent rc = new EventBusComponent(); + EventBusEndPoint endpoint = new EventBusEndPoint("http://host.com:8443/endpoint", rc); + endpoint.setEventTopic("eventTopic"); + endpoint.setPublisher(publisher); + endpoint.setPoolSize(45); + endpoint.setPollingDelay(10); + + assertTrue(endpoint.getEventTopic().compareTo("eventTopic") == 0); + assertTrue(endpoint.getPoolSize() == 45); + assertTrue(endpoint.getPollingDelay() == 10); + assertFalse(endpoint.isSingleton()); + EventBusProducer producer = (EventBusProducer)endpoint.createProducer(); + assertTrue(producer.getEndpoint() != null); + endpoint.close(); } @Test public void validateEventBusComponent() throws Exception { - DMaaPEventBusComponent rc = new DMaaPEventBusComponent(new TestCamelContext()); + EventBusComponent rc = new EventBusComponent(context); Endpoint endpoint = rc.createEndpoint("http://host.com:8443/endpoint", null, new HashMap<String, Object>()); assertTrue(endpoint.getEndpointUri().equals("http://host.com:8443/endpoint")); } @Test public void validateConsumer() throws Exception { - try { - DMaaPEventBusComponent rc = new DMaaPEventBusComponent(); - DMaaPEventBusEndpoint endpoint = new DMaaPEventBusEndpoint("http://host.com:8443/endpoint", rc); - - endpoint.setPassword("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); - endpoint.setUsername("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10"); - endpoint.setEventTopic("eventTopic"); - endpoint.setConsumerId("groupId"); - endpoint.setConsumerGroup("gn"); - endpoint.setName("name"); - endpoint.setPoolSize(45); - endpoint.setPollingDelay(10); - endpoint.setUrl("url"); - - TestProcessor processor = new TestProcessor(); - EventBusConsumer consumer = (EventBusConsumer)endpoint.createConsumer(processor); - - } - catch (Exception ex) { - StringWriter writer = new StringWriter(); - PrintWriter printWriter = new PrintWriter( writer ); - ex.printStackTrace( printWriter ); - printWriter.flush(); - System.out.println(writer.toString()); - throw ex; - } + EventBusComponent rc = new EventBusComponent(); + EventBusEndPoint endpoint = new EventBusEndPoint("http://host.com:8443/endpoint", rc); + + endpoint.setConsumer(consumer); + endpoint.setEventTopic("eventTopic"); + endpoint.setPoolSize(45); + endpoint.setPollingDelay(10); + + assertTrue(endpoint.getEventTopic().compareTo("eventTopic") == 0); + assertTrue(endpoint.getPoolSize() == 45); + assertTrue(endpoint.getPollingDelay() == 10); + assertFalse(endpoint.isSingleton()); + + EventBusConsumer consumer = (EventBusConsumer)endpoint.createConsumer(processor); + } + + @Test + public void validateConsumerPoll() throws Exception { + MessageSupport me = new DefaultMessage(context); + List<String> list = new ArrayList<>(); + list.add("Message 1"); + list.add("Message 2"); + + Mockito.when(consumer.consumeAndCommit()).thenReturn(list); + Mockito.when(endPoint.createExchange()).thenReturn(exchange); + Mockito.when(exchange.getIn()).thenReturn(me); + Mockito.when(exchange.getOut()).thenReturn(me); + + EventBusConsumer busConsumer = new EventBusConsumer(endPoint, processor, consumer); + int messages = busConsumer.poll(); } } |