summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java43
-rw-r--r--src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java (renamed from src/main/java/org/onap/aai/event/EventBusComponent.java)14
-rw-r--r--src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java (renamed from src/main/java/org/onap/aai/event/EventBusEndpoint.java)87
-rw-r--r--src/main/java/org/onap/aai/event/EventBusConsumer.java52
-rw-r--r--src/main/java/org/onap/aai/event/EventBusProducer.java4
-rw-r--r--src/main/java/org/onap/aai/event/KafkaEventBusComponent.java49
-rw-r--r--src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java129
-rw-r--r--src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus1
-rw-r--r--src/main/resources/META-INF/services/org/apache/camel/component/event-bus1
-rw-r--r--src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus1
-rw-r--r--src/test/java/org/onap/aai/event/EventBusTest.java39
11 files changed, 317 insertions, 103 deletions
diff --git a/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java b/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java
new file mode 100644
index 0000000..abbfe63
--- /dev/null
+++ b/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java
@@ -0,0 +1,43 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.event;
+
+import org.apache.camel.Component;
+import org.apache.camel.impl.DefaultEndpoint;
+
+public abstract class AbstractEventBusEndpoint extends DefaultEndpoint {
+ public AbstractEventBusEndpoint() {
+ }
+
+ public AbstractEventBusEndpoint(String endpointUri, Component component) {
+ super(endpointUri, component);
+ }
+
+ public AbstractEventBusEndpoint(String endpointUri) {
+ super(endpointUri);
+ }
+
+ abstract void close();
+ abstract int getPollingDelay();
+ abstract int getPoolSize();
+ abstract String getEventTopic();
+
+}
diff --git a/src/main/java/org/onap/aai/event/EventBusComponent.java b/src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java
index b257c7a..696e0f4 100644
--- a/src/main/java/org/onap/aai/event/EventBusComponent.java
+++ b/src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java
@@ -27,22 +27,22 @@ import org.apache.camel.impl.UriEndpointComponent;
import java.util.Map;
/**
- * Represents the component that manages {@link EventBusEndpoint}.
+ * Represents the component that manages {@link DMaaPEventBusEndpoint}.
*/
-public class EventBusComponent extends UriEndpointComponent {
+public class DMaaPEventBusComponent extends UriEndpointComponent {
- public EventBusComponent() {
- super(EventBusEndpoint.class);
+ public DMaaPEventBusComponent() {
+ super(DMaaPEventBusEndpoint.class);
}
- public EventBusComponent(CamelContext context) {
- super(context, EventBusEndpoint.class);
+ public DMaaPEventBusComponent(CamelContext context) {
+ super(context, DMaaPEventBusEndpoint.class);
}
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters)
throws Exception {
- Endpoint endpoint = new EventBusEndpoint(uri, this);
+ Endpoint endpoint = new DMaaPEventBusEndpoint(uri, this);
setProperties(endpoint, parameters);
return endpoint;
}
diff --git a/src/main/java/org/onap/aai/event/EventBusEndpoint.java b/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java
index ee7cba8..36e930a 100644
--- a/src/main/java/org/onap/aai/event/EventBusEndpoint.java
+++ b/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java
@@ -24,19 +24,19 @@ package org.onap.aai.event;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.eclipse.jetty.util.security.Password;
+import org.onap.aai.event.client.DMaaPEventConsumer;
/**
* Represents a EventBus endpoint.
*/
-@UriEndpoint(scheme = "event-bus", syntax = "event-bus:name",
- consumerClass = EventBusConsumer.class, title = "event-bus")
-public class EventBusEndpoint extends DefaultEndpoint {
+@UriEndpoint(scheme = "dmaap-event-bus", syntax = "dmaap-event-bus:name",
+ consumerClass = EventBusConsumer.class, title = "dmaap-event-bus")
+public class DMaaPEventBusEndpoint extends AbstractEventBusEndpoint {
@UriPath
@Metadata(required = "true")
private String name;
@@ -44,16 +44,16 @@ public class EventBusEndpoint extends DefaultEndpoint {
@UriParam(label = "eventTopic")
@Metadata(required = "true")
private String eventTopic;
- @UriParam(label = "groupName")
+ @UriParam(label = "consumerGroup")
@Metadata(required = "true")
- private String groupName;
- @UriParam(label = "groupId")
+ private String consumerGroup;
+ @UriParam(label = "consumerId")
@Metadata(required = "true")
- private String groupId;
- @UriParam(label = "apiKey")
- private String apiKey;
- @UriParam(label = "apiSecret")
- private String apiSecret;
+ private String consumerId;
+ @UriParam(label = "username")
+ private String username;
+ @UriParam(label = "password")
+ private String password;
@UriParam(label = "url")
@Metadata(required = "true")
private String url;
@@ -63,23 +63,36 @@ public class EventBusEndpoint extends DefaultEndpoint {
@UriParam(label = "pollingDelay")
@Metadata(required = "true", defaultValue="30000")
private int pollingDelay = 30000;
+ @UriParam(label = "transportType")
+ @Metadata(required = "true", defaultValue="HTTPAUTH")
+ private String transportType;
+
+ private DMaaPEventConsumer dmaapConsumer;
- public EventBusEndpoint() {}
+ public DMaaPEventBusEndpoint() {}
- public EventBusEndpoint(String uri, EventBusComponent component) {
+ public DMaaPEventBusEndpoint(String uri, DMaaPEventBusComponent component) {
super(uri, component);
}
- public EventBusEndpoint(String endpointUri) {
+ public DMaaPEventBusEndpoint(String endpointUri) {
super(endpointUri);
}
+
+ @Override
+ void close() {
+ // Don't have to do anything for DMaaP
+ }
+
@Override
public Producer createProducer() throws Exception {
return new EventBusProducer(this);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- return new EventBusConsumer(this, processor);
+ // TODO: other overloads based on filled-in properties
+ dmaapConsumer = new DMaaPEventConsumer(url, eventTopic, Password.deobfuscate(username), Password.deobfuscate(password), consumerGroup, consumerId, 15000, 1000, transportType);
+ return new EventBusConsumer(this, processor, dmaapConsumer);
}
@Override
public boolean isSingleton() {
@@ -102,36 +115,38 @@ public class EventBusEndpoint extends DefaultEndpoint {
this.eventTopic = eventTopic;
}
- public String getGroupName() {
- return groupName;
+ public String getConsumerGroup() {
+ return consumerGroup;
}
- public void setGroupName(String groupName) {
- this.groupName = groupName;
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
}
- public String getGroupId() {
- return groupId;
+ public String getConsumerId() {
+ return consumerId;
}
- public void setGroupId(String groupId) {
- this.groupId = groupId;
+ public void setConsumerId(String consumerId) {
+ this.consumerId = consumerId;
}
- public String getApiKey() {
- return apiKey == null ? null : Password.deobfuscate(apiKey);
+ public String getUsername() {
+ return username == null ? null : Password.deobfuscate(username);
+ //return username;
}
- public void setApiKey(String apiKey) {
- this.apiKey = apiKey;
+ public void setUsername(String username) {
+ this.username = username;
}
- public String getApiSecret() {
- return apiSecret == null ? null : Password.deobfuscate(apiSecret);
+ public String getPassword() {
+ return password == null ? null : Password.deobfuscate(password);
+ //return password;
}
- public void setApiSecret(String apiSecret) {
- this.apiSecret = apiSecret;
+ public void setPassword(String password) {
+ this.password = password;
}
public int getPoolSize() {
@@ -157,5 +172,13 @@ public class EventBusEndpoint extends DefaultEndpoint {
public void setUrl(String url) {
this.url = url;
}
+
+ public String getTransportType() {
+ return transportType;
+ }
+
+ public void setTransportType(String transportType) {
+ this.transportType = transportType;
+ }
}
diff --git a/src/main/java/org/onap/aai/event/EventBusConsumer.java b/src/main/java/org/onap/aai/event/EventBusConsumer.java
index a9d5478..e795e2c 100644
--- a/src/main/java/org/onap/aai/event/EventBusConsumer.java
+++ b/src/main/java/org/onap/aai/event/EventBusConsumer.java
@@ -20,9 +20,7 @@
*/
package org.onap.aai.event;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
+import org.onap.aai.event.api.EventConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -33,10 +31,6 @@ import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.cl.mdc.MdcContext;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.Arrays;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -49,46 +43,21 @@ public class EventBusConsumer extends ScheduledPollConsumer {
private Logger logger = LoggerFactory.getInstance().getLogger(EventBusConsumer.class);
private Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(EventBusConsumer.class);
- private final EventBusEndpoint endpoint;
+ private final AbstractEventBusEndpoint endpoint;
- private CambriaConsumer consumer;
+ private EventConsumer consumer;
/**
* EventBusConsumer Constructor.
*/
- public EventBusConsumer(EventBusEndpoint endpoint, Processor processor) {
+ public EventBusConsumer(AbstractEventBusEndpoint endpoint, Processor processor, EventConsumer consumer) {
super(endpoint, processor);
super.setDelay(endpoint.getPollingDelay());
this.endpoint = endpoint;
setScheduledExecutorService(new ScheduledThreadPoolExecutor(endpoint.getPoolSize()));
- String[] urls = endpoint.getUrl().split(",");
-
- List<String> urlList = null;
-
- if (urls != null) {
- urlList = Arrays.asList(urls);
- }
-
- try {
-
- ConsumerBuilder consumerBuilder = new CambriaClientBuilders.ConsumerBuilder()
- .usingHosts(urlList).onTopic(endpoint.getEventTopic())
- .knownAs(endpoint.getGroupName(), endpoint.getGroupId());
-
- String apiKey = endpoint.getApiKey();
- String apiSecret = endpoint.getApiSecret();
-
- if (apiKey != null && apiSecret != null) {
- consumerBuilder.authenticatedBy(endpoint.getApiKey(), endpoint.getApiSecret());
- }
-
- consumer = consumerBuilder.build();
-
- } catch (MalformedURLException | GeneralSecurityException e) {
- logger.error(RouterCoreMsgs.EVENT_CONSUMER_CREATION_EXCEPTION, e, e.getLocalizedMessage());
- }
+ this.consumer = consumer;
}
/**
@@ -104,7 +73,8 @@ public class EventBusConsumer extends ScheduledPollConsumer {
int processCount = 0;
- Iterable<String> messages = consumer.fetch();
+ //Iterable<String> messages = consumer.fetch();
+ Iterable<String> messages = consumer.consumeAndCommit();
String topic = endpoint.getEventTopic();
@@ -119,15 +89,15 @@ public class EventBusConsumer extends ScheduledPollConsumer {
@Override
protected void doStop() throws Exception {
super.doStop();
- if (consumer != null) {
- consumer.close();
+ if (endpoint != null) {
+ endpoint.close();
}
}
@Override
protected void doShutdown() throws Exception {
super.doShutdown();
- if (consumer != null) {
- consumer.close();
+ if (endpoint != null) {
+ endpoint.close();
}
}
diff --git a/src/main/java/org/onap/aai/event/EventBusProducer.java b/src/main/java/org/onap/aai/event/EventBusProducer.java
index ec2a0da..dd8dbed 100644
--- a/src/main/java/org/onap/aai/event/EventBusProducer.java
+++ b/src/main/java/org/onap/aai/event/EventBusProducer.java
@@ -27,9 +27,9 @@ import org.apache.camel.impl.DefaultProducer;
* The EventBus producer.
*/
public class EventBusProducer extends DefaultProducer {
- private EventBusEndpoint endpoint;
+ private AbstractEventBusEndpoint endpoint;
- public EventBusProducer(EventBusEndpoint endpoint) {
+ public EventBusProducer(AbstractEventBusEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
}
diff --git a/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java b/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java
new file mode 100644
index 0000000..07b4c82
--- /dev/null
+++ b/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java
@@ -0,0 +1,49 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.event;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+
+import java.util.Map;
+
+/**
+ * Represents the component that manages {@link KafkaEventBusEndpoint}.
+ */
+public class KafkaEventBusComponent extends UriEndpointComponent {
+
+ public KafkaEventBusComponent() {
+ super(KafkaEventBusEndpoint.class);
+ }
+
+ public KafkaEventBusComponent(CamelContext context) {
+ super(context, KafkaEventBusEndpoint.class);
+ }
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters)
+ throws Exception {
+ Endpoint endpoint = new KafkaEventBusEndpoint(uri, this);
+ setProperties(endpoint, parameters);
+ return endpoint;
+ }
+}
diff --git a/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java b/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java
new file mode 100644
index 0000000..4194c89
--- /dev/null
+++ b/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java
@@ -0,0 +1,129 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.event;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.onap.aai.event.client.KafkaEventConsumer;
+
+/**
+ * Represents a EventBus endpoint.
+ */
+@UriEndpoint(scheme = "kafka-event-bus", syntax = "kafka-event-bus:name",
+ consumerClass = EventBusConsumer.class, title = "kafka-event-bus")
+public class KafkaEventBusEndpoint extends AbstractEventBusEndpoint {
+ @UriParam(label = "url")
+ @Metadata(required = "true")
+ private String url;
+ @UriParam(label = "eventTopic")
+ @Metadata(required = "true")
+ private String eventTopic;
+ @UriParam(label = "consumerGroup")
+ @Metadata(required = "true")
+ private String consumerGroup;
+ @UriParam(label = "poolSize")
+ @Metadata(required = "true", defaultValue="20")
+ private int poolSize = 20;
+ @UriParam(label = "pollingDelay")
+ @Metadata(required = "true", defaultValue="30000")
+ private int pollingDelay = 30000;
+
+ private KafkaEventConsumer consumer;
+
+ public KafkaEventBusEndpoint(String uri, KafkaEventBusComponent component) {
+ super(uri, component);
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ return new EventBusProducer(this);
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ consumer = new KafkaEventConsumer(url, eventTopic, consumerGroup);
+ return new EventBusConsumer(this, processor, consumer);
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return false;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ @Override
+ String getEventTopic() {
+ return eventTopic;
+ }
+
+ public void setEventTopic(String eventTopic) {
+ this.eventTopic = eventTopic;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ @Override
+ int getPoolSize() {
+ return poolSize;
+ }
+
+ public void setPoolSize(int poolSize) {
+ this.poolSize = poolSize;
+ }
+
+ @Override
+ void close() {
+ consumer.close();
+ }
+
+ @Override
+ int getPollingDelay() {
+ return pollingDelay;
+ }
+
+ public void setPollingDelay(int pollingDelay) {
+ this.pollingDelay = pollingDelay;
+ }
+
+
+
+
+
+
+}
diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus
new file mode 100644
index 0000000..f1fee02
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus
@@ -0,0 +1 @@
+class=org.onap.aai.event.DMaaPEventBusComponent
diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/event-bus
deleted file mode 100644
index f795067..0000000
--- a/src/main/resources/META-INF/services/org/apache/camel/component/event-bus
+++ /dev/null
@@ -1 +0,0 @@
-class=org.onap.aai.event.EventBusComponent
diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus
new file mode 100644
index 0000000..89f243d
--- /dev/null
+++ b/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus
@@ -0,0 +1 @@
+class=org.onap.aai.event.KafkaEventBusComponent
diff --git a/src/test/java/org/onap/aai/event/EventBusTest.java b/src/test/java/org/onap/aai/event/EventBusTest.java
index 8d294cd..1add1c0 100644
--- a/src/test/java/org/onap/aai/event/EventBusTest.java
+++ b/src/test/java/org/onap/aai/event/EventBusTest.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
-import java.util.Map;
import org.apache.camel.Endpoint;
import org.junit.Before;
@@ -46,23 +45,23 @@ public class EventBusTest {
@Test
public void validateProducer() throws Exception {
try {
- EventBusComponent rc = new EventBusComponent();
- EventBusEndpoint endpoint = new EventBusEndpoint("http://host.com:8443/endpoint", rc);
- endpoint.setApiSecret("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
- endpoint.setApiKey("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
+ DMaaPEventBusComponent rc = new DMaaPEventBusComponent();
+ DMaaPEventBusEndpoint endpoint = new DMaaPEventBusEndpoint("http://host.com:8443/endpoint", rc);
+ endpoint.setPassword("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
+ endpoint.setUsername("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
endpoint.setEventTopic("eventTopic");
- endpoint.setGroupId("groupId");
- endpoint.setGroupName("gn");
+ endpoint.setConsumerId("groupId");
+ endpoint.setConsumerGroup("gn");
endpoint.setName("name");
endpoint.setPoolSize(45);
endpoint.setPollingDelay(10);
endpoint.setUrl("url");
- assertTrue(endpoint.getApiSecret().compareTo("onapSecret") == 0);
- assertTrue(endpoint.getApiKey().compareTo("onapSecret") == 0);
+ assertTrue(endpoint.getPassword().compareTo("onapSecret") == 0);
+ assertTrue(endpoint.getUsername().compareTo("onapSecret") == 0);
assertTrue(endpoint.getEventTopic().compareTo("eventTopic") == 0);
- assertTrue(endpoint.getGroupId().compareTo("groupId") == 0);
- assertTrue(endpoint.getGroupName().compareTo("gn") == 0);
+ assertTrue(endpoint.getConsumerId().compareTo("groupId") == 0);
+ assertTrue(endpoint.getConsumerGroup().compareTo("gn") == 0);
assertTrue(endpoint.getName().compareTo("name") == 0);
assertTrue(endpoint.getPoolSize() == 45);
assertTrue(endpoint.getPollingDelay() == 10);
@@ -84,7 +83,7 @@ public class EventBusTest {
@Test
public void validateEventBusComponent() throws Exception {
- EventBusComponent rc = new EventBusComponent(new TestCamelContext());
+ DMaaPEventBusComponent rc = new DMaaPEventBusComponent(new TestCamelContext());
Endpoint endpoint = rc.createEndpoint("http://host.com:8443/endpoint", null, new HashMap<String, Object>());
assertTrue(endpoint.getEndpointUri().equals("http://host.com:8443/endpoint"));
}
@@ -92,22 +91,22 @@ public class EventBusTest {
@Test
public void validateConsumer() throws Exception {
try {
- EventBusComponent rc = new EventBusComponent();
- EventBusEndpoint endpoint = new EventBusEndpoint("http://host.com:8443/endpoint", rc);
+ DMaaPEventBusComponent rc = new DMaaPEventBusComponent();
+ DMaaPEventBusEndpoint endpoint = new DMaaPEventBusEndpoint("http://host.com:8443/endpoint", rc);
- endpoint.setApiSecret("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
- endpoint.setApiKey("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
+ endpoint.setPassword("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
+ endpoint.setUsername("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
endpoint.setEventTopic("eventTopic");
- endpoint.setGroupId("groupId");
- endpoint.setGroupName("gn");
+ endpoint.setConsumerId("groupId");
+ endpoint.setConsumerGroup("gn");
endpoint.setName("name");
endpoint.setPoolSize(45);
endpoint.setPollingDelay(10);
endpoint.setUrl("url");
TestProcessor processor = new TestProcessor();
- EventBusConsumer consumer = new EventBusConsumer(endpoint, processor);
-
+ EventBusConsumer consumer = (EventBusConsumer)endpoint.createConsumer(processor);
+
}
catch (Exception ex) {
StringWriter writer = new StringWriter();