aboutsummaryrefslogtreecommitdiffstats
path: root/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle
diff options
context:
space:
mode:
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle')
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/pom.xml41
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java58
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java68
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java26
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java35
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java45
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java46
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java64
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java99
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java56
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java (renamed from appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java)2
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java (renamed from appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java)2
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java (renamed from appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java)27
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java (renamed from appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java)20
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java231
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java220
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java83
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java (renamed from appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java)100
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/OSGI-INF/blueprint/blueprint.xml4
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties57
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt1
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/producer.properties52
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java134
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java (renamed from appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/dmaap/TestAppcDmaapAdapterActivator.java)4
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapConsuming.java84
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapEventSender.java169
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapProducing.java80
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java242
28 files changed, 1479 insertions, 571 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/pom.xml b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/pom.xml
index 44ca150d1..449ce92d2 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/pom.xml
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/pom.xml
@@ -14,13 +14,17 @@
<dependencies>
<dependency>
<groupId>org.openecomp.appc</groupId>
+ <artifactId>appc-message-adapter-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.openecomp.appc</groupId>
<artifactId>appc-metric-bundle</artifactId>
- <version>1.1.0-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.openecomp.appc</groupId>
<artifactId>appc-common</artifactId>
- <classifier>jar-with-dependencies</classifier>
<version>${project.version}</version>
</dependency>
@@ -65,7 +69,6 @@
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
- <version>1.6.2</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -85,10 +88,17 @@
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
- <version>1.6.2</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </dependency>
<dependency>
<groupId>org.openecomp.sdnc.core</groupId>
<artifactId>sli-common</artifactId>
@@ -119,6 +129,14 @@
</exclusions>
</dependency>
+ <!-- DMaaP Client -->
+ <dependency>
+ <groupId>com.att.nsa</groupId>
+ <artifactId>dmaapClient</artifactId>
+ <version>0.2.12</version>
+<!-- <version>${dmaap.client.version}</version> -->
+ </dependency>
+
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
@@ -129,8 +147,6 @@
</dependency>
</dependencies>
-
-
<build>
<plugins>
<plugin>
@@ -139,12 +155,13 @@
<extensions>true</extensions>
<configuration>
<instructions>
- <Bundle-SymbolicName>org.openecomp.appc.adapter.dmaap</Bundle-SymbolicName>
- <Bundle-Activator>org.openecomp.appc.adapter.dmaap.AppcDmaapAdapterActivator</Bundle-Activator>
- <Export-Package>org.openecomp.appc.adapter.dmaap.*</Export-Package>
- <Export-Serice>org.openecomp.appc.adapter.dmaap.EventSender</Export-Serice>
- <Import-Package>org.openecomp.appc.metricservice.*,org.openecomp.sdnc.sli.*,org.osgi.framework.*,!org.osgi.service.event.*,org.osgi.service.*,org.osgi.util.*,org.slf4j.*,com.vmware.*,org.apache.xerces.*,javax.net.ssl.*,org.xml.sax.*,javax.xml.*,javax.naming.*,javax.crypto.*, com.sun.jersey.spi.container.servlet,org.eclipse.jetty.servlets</Import-Package>
- <Embed-Dependency>*;scope=compile|runtime;artifactId=!appc-metric-bundle|sli-common|org.eclipse.osgi|slf4j-api|jcl-over-slf4j|mysql-connector-java|xml-apis|pax-*</Embed-Dependency>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Bundle-Version>${project.version}</Bundle-Version>
+ <!--<Bundle-SymbolicName>org.openecomp.appc.adapter.messaging.dmaap</Bundle-SymbolicName>-->
+ <Bundle-Activator>org.openecomp.appc.adapter.messaging.dmaap.AppcDmaapAdapterActivator</Bundle-Activator>
+ <Export-Package>org.openecomp.appc.adapter.messaging.*</Export-Package>
+ <Import-Package>!org.slf4j.event,org.openecomp.appc.adapter.message.*,org.openecomp.appc.metricservice.*,com.att.nsa.*,org.openecomp.sdnc.sli.*,org.osgi.framework.*,!org.osgi.service.event.*,org.osgi.service.*,org.osgi.util.*,org.slf4j.*,com.vmware.*,org.apache.xerces.*,javax.net.ssl.*,org.xml.sax.*,javax.xml.*,javax.naming.*,javax.crypto.*, com.sun.jersey.spi.container.servlet,org.eclipse.jetty.servlets</Import-Package>
+ <Embed-Dependency>*;scope=compile|runtime;artifactId=!appc-metric-bundle|appc-message-adapter-api|sli-common|org.eclipse.osgi|slf4j-api|jcl-over-slf4j|mysql-connector-java|xml-apis|pax-*</Embed-Dependency>
<Embed-Transitive>true</Embed-Transitive>
<Bundle-Blueprint>OSGI-INF/blueprint/blueprint.xml</Bundle-Blueprint>
</instructions>
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java
deleted file mode 100644
index 7c282911d..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * openECOMP : APP-C
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights
- * reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.appc.adapter.dmaap;
-
-import java.util.List;
-import java.util.concurrent.Callable;
-
-public class CallableConsumer implements Callable<List<String>> {
-
- private Consumer consumer;
-
- private int timeout = 15000;
- private int limit = 1000;
-
- public CallableConsumer(Consumer c) {
- this.consumer = c;
- }
-
- public CallableConsumer(Consumer c, int waitMs, int fetchSize) {
- this.consumer = c;
- this.timeout = waitMs;
- this.limit = fetchSize;
- }
-
- @Override
- public List<String> call() {
- return consumer.fetch(timeout, limit);
- }
-
- /**
- * The maximum amount of time to keep a connection alive. Currently is set to waitMs + 10s
- *
- * @return An integer representing the maximum amount of time to keep this thread alive
- */
- public int getMaxLife() {
- return 10000 + timeout;
- }
-
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java
deleted file mode 100644
index 32034e5fb..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * openECOMP : APP-C
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights
- * reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.appc.adapter.dmaap;
-
-import java.util.List;
-
-public interface Consumer {
-
- /**
- * Gets a batch of messages from the topic. Defaults to 1000 messages with 15s wait for messages if empty.
- *
- * @return A list of strings representing the messages pulled from the topic.
- */
- public List<String> fetch();
-
- /**
- * Gets a batch of messages from the topic.
- *
- * @param waitMs
- * The amount of time to wait in milliseconds if the topic is empty for data to be written. Should be no
- * less than 15000ms to prevent too many requests
- * @param limit
- * The amount of messages to fetch
- * @return A list of strings representing the messages pulled from the topic.
- */
- public List<String> fetch(int waitMs, int limit);
-
- /**
- * Updates the api credentials for making authenticated requests
- *
- * @param apiKey
- * The public key to authenticate with
- * @param apiSecret
- * The secret key to authenticate with
- */
- public void updateCredentials(String apiKey, String apiSecret);
-
- // TODO - Implement once Cambria allows you to set outside of constructor
- // public void setFilter(String filter);
-
- /**
- * Creates a dmaap client using a https connection
- *
- * @param yes
- * True if https should be used, false otherwise
- */
- public void useHttps(boolean yes);
-
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java
deleted file mode 100644
index efbe194ba..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * openECOMP : APP-C
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights
- * reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.appc.adapter.dmaap;
-
-public enum DmaapDestination {
- DCAE
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java
deleted file mode 100644
index 7d4a7c090..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * openECOMP : APP-C
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights
- * reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.appc.adapter.dmaap;
-
-import java.util.Map;
-
-import org.openecomp.appc.adapter.dmaap.event.EventMessage;
-import org.openecomp.appc.exceptions.APPCException;
-import org.openecomp.sdnc.sli.SvcLogicContext;
-import org.openecomp.sdnc.sli.SvcLogicJavaPlugin;
-
-
-public interface EventSender extends SvcLogicJavaPlugin{
- boolean sendEvent(DmaapDestination destination, EventMessage msg);
- boolean sendEvent(DmaapDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException;
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java
deleted file mode 100644
index 183e618ba..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * openECOMP : APP-C
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights
- * reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.appc.adapter.dmaap;
-
-import java.util.Set;
-
-public interface Manager {
-
- /**
- * Updates the api credentials for making authenticated requests
- *
- * @param apiKey
- * The public key to authenticate with
- * @param apiSecret
- * The secret key to authenticate with
- */
- public void updateCredentials(String apiKey, String apiSecret);
-
- /**
- * Return a set of strings representing topics that the user can see
- *
- * @return A set of strings with topic names or an empty set if no topics are visible
- */
- public Set<String> getTopics();
-
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java
deleted file mode 100644
index f19c516be..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * openECOMP : APP-C
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights
- * reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.appc.adapter.dmaap;
-
-public interface Producer {
-
- public boolean post(String partition, String data);
-
- /**
- * Updates the api credentials for making authenticated requests
- *
- * @param apiKey
- * The public key to authenticate with
- * @param apiSecret
- * The secret key to authenticate with
- */
- public void updateCredentials(String apiKey, String apiSecret);
-
- /**
- * Creates a dmaap client using a https connection
- *
- * @param yes
- * True if https should be used, false otherwise
- */
- public void useHttps(boolean yes);
-
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java
deleted file mode 100644
index dd951fe37..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * openECOMP : APP-C
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights
- * reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.appc.adapter.dmaap.event;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-
-public class EventHeader {
-
- @JsonProperty("eventTime")
- private final String eventTime;
-
- @JsonProperty("apiVer")
- private final String apiVer;
-
- @JsonProperty("eventId")
- private final String eventId;
-
- public EventHeader(String eventTime, String apiVer, String eventId) {
- this.eventTime = eventTime;
- this.apiVer = apiVer;
- this.eventId = eventId;
- }
-
- public String getEventTime() {
- return eventTime;
- }
-
- public String getApiVer() {
- return apiVer;
- }
-
- public String getEventId() {
- return eventId;
- }
-
- @Override
- public String toString() {
- return "EventHeader{" +
- "eventTime='" + eventTime + '\'' +
- ", apiVer='" + apiVer + '\'' +
- ", eventId='" + eventId + '\'' +
- '}';
- }
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java
deleted file mode 100644
index af5cff2f9..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * openECOMP : APP-C
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights
- * reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.appc.adapter.dmaap.event;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize.Inclusion;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/*
- {
- "EventHeader": {
- "eventTime": "2016-03-15T10:59:33.79Z",
- "apiVer": "1.01",
- "EventId": "<ECOMP_EVENT_ID>",
- },
- "EventStatus": {
- "code": "NNN",
- "reason": "A reason"
- }
- }
-*/
-
-
-
-
-@JsonSerialize(include = Inclusion.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class EventMessage implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- @JsonProperty("eventHeader")
- private EventHeader eventHeader;
- @JsonProperty("eventStatus")
- private EventStatus eventStatus;
-
- public EventMessage(EventHeader eventHeader, EventStatus eventStatus) {
- this.eventHeader = eventHeader;
- this.eventStatus = eventStatus;
- }
-
- public EventHeader getEventHeader() {
- return eventHeader;
- }
-
- public void setEventHeader(EventHeader eventHeader) {
- this.eventHeader = eventHeader;
- }
-
- public EventStatus getEventStatus() {
- return eventStatus;
- }
-
- public void setEventStatus(EventStatus eventStatus) {
- this.eventStatus = eventStatus;
- }
-
- public String toJson() {
- try {
- return OBJECT_MAPPER.writeValueAsString(this);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public String toString() {
- return "EventMessage{" +
- "eventHeader=" + eventHeader +
- ", eventStatus=" + eventStatus +
- '}';
- }
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java
deleted file mode 100644
index f5d7a59d4..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * openECOMP : APP-C
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights
- * reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.appc.adapter.dmaap.event;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-
-public class EventStatus {
-
- @JsonProperty("code")
- private final Integer code;
-
- @JsonProperty("reason")
- private final String reason;
-
- public EventStatus(Integer code, String aReason) {
- this.code = code;
- reason = aReason;
- }
-
-
- public Integer getCode() {
- return code;
- }
-
- public String getReason() {
- return reason;
- }
-
- @Override
- public String toString() {
- return "EventStatus{" +
- "code=" + code +
- ", reason='" + reason + '\'' +
- '}';
- }
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java
index c02553dfe..c7be330cf 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java
@@ -19,7 +19,7 @@
* ============LICENSE_END=========================================================
*/
-package org.openecomp.appc.adapter.dmaap;
+package org.openecomp.appc.adapter.messaging.dmaap;
import org.openecomp.appc.configuration.ConfigurationFactory;
import com.att.eelf.configuration.EELFLogger;
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
index 654ec6f7f..0d0450681 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
@@ -19,7 +19,7 @@
* ============LICENSE_END=========================================================
*/
-package org.openecomp.appc.adapter.dmaap;
+package org.openecomp.appc.adapter.messaging.dmaap.http;
import java.net.URI;
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
index 6e16d896b..2145eaa70 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
@@ -19,14 +19,13 @@
* ============LICENSE_END=========================================================
*/
-package org.openecomp.appc.adapter.dmaap;
+package org.openecomp.appc.adapter.messaging.dmaap.http;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
@@ -38,10 +37,11 @@ import org.apache.http.client.utils.URIBuilder;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.json.JSONArray;
+import org.openecomp.appc.adapter.message.Consumer;
-public class DmaapConsumer extends CommonHttpClient implements Consumer {
+public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer {
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumer.class);
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
// Default values
private static final int DEFAULT_TIMEOUT_MS = 15000;
@@ -54,17 +54,17 @@ public class DmaapConsumer extends CommonHttpClient implements Consumer {
private boolean useHttps = false;
- public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId) {
+ public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId) {
this(hosts, topicName, consumerName, consumerId, null);
}
- public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId,
- String filter) {
+ public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
+ String filter) {
this(hosts, topicName, consumerName, consumerId, filter, null, null);
}
- public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId,
- String filter, String user, String password) {
+ public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
+ String filter, String user, String password) {
urls = new ArrayList<String>();
for (String host : hosts) {
urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
@@ -155,9 +155,10 @@ public class DmaapConsumer extends CommonHttpClient implements Consumer {
LOG.error("Interrupted while sleeping");
}
}
-
- public void close(){
- //not used yet
- }
+
+ @Override
+ public void close() {
+ // Nothing to do
+ }
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
index 6845177b1..85e446dca 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
@@ -19,7 +19,7 @@
* ============LICENSE_END=========================================================
*/
-package org.openecomp.appc.adapter.dmaap;
+package org.openecomp.appc.adapter.messaging.dmaap.http;
import java.net.URI;
import java.util.ArrayList;
@@ -34,10 +34,11 @@ import com.att.eelf.configuration.EELFManager;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
+import org.openecomp.appc.adapter.message.Producer;
-public class DmaapProducer extends CommonHttpClient implements Producer {
+public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer {
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducer.class);
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class);
private static final String CONTENT_TYPE = "application/cambria";
private static final String URL_TEMPLATE = "%s/events/%s";
@@ -47,7 +48,7 @@ public class DmaapProducer extends CommonHttpClient implements Producer {
private boolean useHttps = false;
- public DmaapProducer(Collection<String> urls, String topicName) {
+ public HttpDmaapProducerImpl(Collection<String> urls, String topicName) {
hosts = new ArrayList<String>();
topics = new HashSet<String>();
topics.add(topicName);
@@ -57,7 +58,7 @@ public class DmaapProducer extends CommonHttpClient implements Producer {
}
}
- public DmaapProducer(Collection<String> urls, Set<String> topicNames) {
+ public HttpDmaapProducerImpl(Collection<String> urls, Set<String> topicNames) {
hosts = new ArrayList<String>();
topics = topicNames;
@@ -126,8 +127,9 @@ public class DmaapProducer extends CommonHttpClient implements Producer {
String m = (msg == null) ? "" : msg;
return String.format("%d.%d.%s%s", p.length(), m.length(), p, m);
}
-
- public void close(){
- //not used yet
- }
+
+ @Override
+ public void close() {
+ // Nothing to do
+ }
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
new file mode 100644
index 000000000..342d52448
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
@@ -0,0 +1,231 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.messaging.dmaap.impl;
+
+import java.io.IOException;
+import java.util.*;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+//import com.att.nsa.cambria.client.CambriaClientBuilders;
+//import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
+//import com.att.nsa.cambria.client.CambriaConsumer;
+
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRConsumer;
+import org.apache.commons.lang3.StringUtils;
+import org.openecomp.appc.adapter.message.Consumer;
+import org.openecomp.appc.configuration.Configuration;
+import org.openecomp.appc.configuration.ConfigurationFactory;
+import org.openecomp.appc.metricservice.MetricRegistry;
+import org.openecomp.appc.metricservice.MetricService;
+import org.openecomp.appc.metricservice.impl.MetricServiceImpl;
+import org.openecomp.appc.metricservice.metric.Metric;
+import org.openecomp.appc.metricservice.metric.MetricType;
+import org.openecomp.appc.metricservice.metric.DmaapRequestCounterMetric;
+import org.openecomp.appc.metricservice.policy.PublishingPolicy;
+import org.openecomp.appc.metricservice.publisher.LogPublisher;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceReference;
+
+public class DmaapConsumerImpl implements Consumer {
+
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
+ private static final Configuration configuration = ConfigurationFactory.getConfiguration();
+ // Default values
+ private static final int DEFAULT_TIMEOUT_MS = 60000;
+ private static final int DEFAULT_LIMIT = 1000;
+ private static MetricRegistry metricRegistry;
+ private String topic;
+ private DmaapRequestCounterMetric dmaapKpiMetric;
+ private boolean isMetricEnabled=false;
+ private boolean useHttps = false;
+ private MRConsumer client = null;
+ private Properties props = null;
+
+
+ public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password) {
+ this(urls, topicName, consumerGroupName, consumerId,user, password,null);
+
+ }
+
+ public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password,String filter) {
+ this.topic = topicName;
+ this.props = new Properties();
+ String urlsStr = StringUtils.join(urls, ',');
+ props.setProperty("host",urlsStr);
+ props.setProperty("group",consumerGroupName);
+ props.setProperty("id",consumerId);
+ props.setProperty("username",user);
+ props.setProperty("password",password);
+ if(filter != null) {
+ props.setProperty("filter", filter);
+ }
+ }
+
+
+ private void initMetric() {
+ LOG.debug("Metric getting initialized");
+ MetricService metricService = getMetricservice();
+ metricRegistry = metricService.createRegistry("APPC");
+ dmaapKpiMetric = metricRegistry.metricBuilderFactory().
+ dmaapRequestCounterBuilder().
+ withName("DMAAP_KPI").withType(MetricType.COUNTER).
+ withRecievedMessage(0)
+ .withPublishedMessage(0)
+ .build();
+ if (metricRegistry.register(dmaapKpiMetric)) {
+ Metric[] metrics = new Metric[]{dmaapKpiMetric};
+ LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
+ LogPublisher[] logPublishers = new LogPublisher[1];
+ logPublishers[0] = logPublisher;
+ PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory().
+ scheduledPolicyBuilder().withPublishers(logPublishers).
+ withMetrics(metrics).
+ build();
+ LOG.debug("Policy getting initialized");
+ manuallyScheduledPublishingPolicy.init();
+ LOG.debug("Metric initialized");
+ }
+ }
+ private MRConsumer getClient() {
+ return getClient(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
+ }
+
+ /**
+ * @return An instance of MRConsumer created from our class variables
+ */
+ private synchronized MRConsumer getClient(int waitMs, int limit) {
+ try {
+ props.setProperty("timeout",String.valueOf(waitMs));
+ props.setProperty("limit",String.valueOf(limit));
+ String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props);
+ return MRClientFactory.createConsumer ( topicProducerPropFileName);
+ } catch (IOException e1) {
+ LOG.error("failed to createConsumer",e1);
+ return null;
+ }
+ }
+
+ @Override
+ public synchronized void updateCredentials(String key, String secret) {
+ LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
+ String user = key;
+ String password = secret;
+ props.setProperty("user",String.valueOf(user));
+ props.setProperty("password",String.valueOf(password));
+ client = null;
+ }
+
+ @Override
+ public List<String> fetch(int waitMs, int limit) {
+ Properties properties=configuration.getProperties();
+ if(properties!=null && properties.getProperty("metric.enabled")!=null ){
+ isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled"));
+ }
+ if(isMetricEnabled){
+ initMetric();
+ }
+ LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
+ List<String> out = new ArrayList<String>();
+
+ // Create client once and reuse it on subsequent fetches. This is
+ // to support failover to other servers in the DMaaP cluster.
+ if (client == null) {
+ LOG.info("Getting DMaaP Client ...");
+ client = getClient(waitMs, limit);
+ }
+ try {
+ for (String s : client.fetch(waitMs, limit)) {
+ out.add(s);
+ if(isMetricEnabled){
+ ((DmaapRequestCounterMetric)metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage();
+ }
+ }
+ LOG.debug(String.format("Got %d records from %s", out.size(), this.toString()));
+ } catch (Exception e) {
+ // Connection exception
+ LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage()));
+ e.printStackTrace();
+ try {
+ LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs));
+ Thread.sleep(waitMs);
+ } catch (InterruptedException e2) {
+ LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs));
+ }
+ }
+
+
+ return out;
+ }
+
+ /**
+ * Close consumer Dmaap client
+ */
+ @Override
+ public void close() {
+ LOG.debug("Closing Dmaap consumer client....");
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Override
+ public List<String> fetch() {
+ return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
+ }
+
+ @Override
+ public String toString() {
+ String hostStr = (props == null || props.getProperty("host") == null? "N/A" : props.getProperty("host"));
+ String group = (props == null || props.getProperty("group") == null? "N/A" : props.getProperty("group"));
+ String id = (props == null || props.getProperty("id") == null? "N/A" : props.getProperty("id"));
+ return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr);
+ }
+
+ @Override
+ public void useHttps(boolean yes) {
+ useHttps = yes;
+ }
+
+
+ private MetricService getMetricservice() {
+ BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext();
+ // Get AAIadapter reference
+ ServiceReference sref = bctx.getServiceReference(MetricService.class.getName());
+ if (sref != null) {
+ LOG.info("Metric Service from bundlecontext");
+ return (MetricServiceImpl) bctx.getService(sref);
+
+ } else {
+ LOG.info("Metric Service error from bundlecontext");
+ LOG.warn("Cannot find service reference for org.openecomp.appc.metricservice.MetricService");
+ return null;
+
+ }
+ }
+
+ public Metric getMetric(String name){
+ return metricRegistry.metric(name);
+ }
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
new file mode 100644
index 000000000..79d6b3db7
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
@@ -0,0 +1,220 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.messaging.dmaap.impl;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+//import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+//import com.att.nsa.cambria.client.CambriaClientBuilders;
+//import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
+
+import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.MRClientFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.openecomp.appc.adapter.message.Producer;
+import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapUtil;
+import org.openecomp.appc.configuration.Configuration;
+import org.openecomp.appc.configuration.ConfigurationFactory;
+import org.openecomp.appc.metricservice.MetricRegistry;
+import org.openecomp.appc.metricservice.MetricService;
+import org.openecomp.appc.metricservice.metric.Metric;
+import org.openecomp.appc.metricservice.metric.MetricType;
+import org.openecomp.appc.metricservice.metric.DmaapRequestCounterMetric;
+import org.openecomp.appc.metricservice.policy.PublishingPolicy;
+import org.openecomp.appc.metricservice.publisher.LogPublisher;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceReference;
+
+public class DmaapProducerImpl implements Producer {
+
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class);
+ private static final Configuration configuration = ConfigurationFactory.getConfiguration();
+
+ private Set<String> topics = new HashSet<String>();
+
+ private Properties props = null;
+ private static MetricRegistry metricRegistry;
+ private boolean useHttps = false;
+ private DmaapRequestCounterMetric dmaapKpiMetric;
+ private boolean isMetricEnabled=false;
+
+ private Set<MRBatchingPublisher> clients;
+
+
+ public DmaapProducerImpl(Collection<String> urls, String topicName, String user, String password) {
+ this(urls, (Set<String>)null, user, password);
+ this.topics = new HashSet<>();
+ if (topicName != null) {
+ for (String topic : topicName.split(",")) {
+ topics.add(topic);
+ }
+ }
+ }
+
+ public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) {
+ topics = topicNames;
+ if(urls == null || user == null || password == null){
+ throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password" );
+ }
+ this.props = new Properties();
+ String urlsStr = StringUtils.join(urls, ',');
+ props.setProperty("host",urlsStr);
+ props.setProperty("id", UUID.randomUUID().toString());
+ props.setProperty("username",user);
+ props.setProperty("password",password);
+ }
+ private void initMetric() {
+ LOG.debug("Metric getting initialized");
+ MetricService metricService = getMetricservice();
+ metricRegistry=metricService.createRegistry("APPC");
+ dmaapKpiMetric = metricRegistry.metricBuilderFactory().
+ dmaapRequestCounterBuilder().
+ withName("DMAAP_KPI").withType(MetricType.COUNTER).
+ withRecievedMessage(0)
+ .withPublishedMessage(0)
+ .build();
+ if(metricRegistry.register(dmaapKpiMetric)) {
+ Metric[] metrics = new Metric[]{dmaapKpiMetric};
+ LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
+ LogPublisher[] logPublishers = new LogPublisher[1];
+ logPublishers[0] = logPublisher;
+ PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory().
+ scheduledPolicyBuilder().withPublishers(logPublishers).
+ withMetrics(metrics).
+ build();
+ LOG.debug("Policy getting initialized");
+ manuallyScheduledPublishingPolicy.init();
+ LOG.debug("Metric initialized");
+ }
+
+ }
+ private Set<MRBatchingPublisher> getClients() {
+ Set<MRBatchingPublisher> out = new HashSet<MRBatchingPublisher>();
+ for (String topic : topics) {
+ try {
+ String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props);
+ final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher (topicProducerPropFileName);
+ out.add(client);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ return out;
+ }
+
+ @Override
+ public synchronized void updateCredentials(String key, String secret) {
+ LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
+ String user = key;
+ String password = secret;
+ props.setProperty("user",String.valueOf(user));
+ props.setProperty("password",String.valueOf(password));
+ clients = null;
+ }
+
+ @Override
+ public boolean post(String partition, String data) {
+ boolean success = true;
+ Properties properties=configuration.getProperties();
+ if(properties!=null && properties.getProperty("metric.enabled")!=null ){
+ isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled"));
+ }
+ if(isMetricEnabled){
+ initMetric();
+ }
+
+ // Create clients once and reuse them on subsequent posts. This is
+ // to support failover to other servers in the Dmaap cluster.
+ if ((clients == null) || (clients.isEmpty())) {
+ LOG.info("Getting CambriaBatchingPublisher Clients ...");
+ clients = getClients();
+ }
+
+ for (MRBatchingPublisher client : clients) {
+ try {
+ LOG.debug(String.format("Posting %s to %s", data, client));
+ client.send(partition, data);
+ } catch (IOException e) {
+ e.printStackTrace();
+ success = false;
+ }
+ }
+ if(isMetricEnabled){
+ ( (DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage();
+ }
+ return success;
+ }
+
+ /**
+ * Close producer Dmaap client
+ */
+ @Override
+ public void close() {
+ if ((clients == null) || (clients.isEmpty())) {
+ return;
+ }
+
+ LOG.debug("Closing Dmaap producer clients....");
+ for (MRBatchingPublisher client : clients) {
+ try {
+ client.close(1, TimeUnit.SECONDS);
+ } catch (IOException | InterruptedException e) {
+ LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client));
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public void useHttps(boolean yes) {
+ useHttps = yes;
+ }
+
+ private MetricService getMetricservice() {
+/*
+ return AppcDmaapAdapterActivator.getMetricService();
+*/
+
+ BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext();
+ ServiceReference sref = bctx.getServiceReference(MetricService.class.getName());
+ if (sref != null) {
+ LOG.info("Metric Service from bundlecontext");
+ return (MetricService) bctx.getService(sref);
+
+ } else {
+ LOG.info("Metric Service error from bundlecontext");
+ LOG.warn("Cannot find service reference for org.openecomp.appc.metricservice.MetricService");
+ return null;
+
+ }
+ }
+
+ public Metric getMetric(String name){
+ return metricRegistry.metric(name);
+ }
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java
new file mode 100644
index 000000000..39857b856
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java
@@ -0,0 +1,83 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.messaging.dmaap.impl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+public class DmaapUtil {
+ private final static String delimiter = "_";
+ private static String createPreferredRouteFileIfNotExist(String topic) throws IOException {
+ String topicPreferredRouteFileName = null;
+ topicPreferredRouteFileName = topic+"preferredRoute.properties";
+ File fo= new File(topicPreferredRouteFileName);
+ if(!fo.exists()) {
+ ClassLoader classLoader = DmaapUtil.class.getClassLoader();
+ InputStream inputStream = classLoader.getResourceAsStream("preferredRoute.txt");
+ Properties props = new Properties();
+ props.load(inputStream);
+ String fileName = topic != null ? topic+delimiter+"MR1" : delimiter+"MR1";
+ props.setProperty("preferredRouteKey", fileName);
+ topicPreferredRouteFileName = topic + "preferredRoute.properties";
+ props.store(new FileOutputStream(topicPreferredRouteFileName), "preferredRoute.properties file created on the fly for topic:" + topic + " on:" + System.currentTimeMillis());
+ }
+ return topicPreferredRouteFileName;
+ }
+
+ public static String createConsumerPropFile(String topic, Properties props)throws IOException {
+ String defaultProfFileName = "consumer.properties";
+ String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props);
+ return topicConsumerPropFileName;
+ }
+
+ public static String createProducerPropFile(String topic, Properties props)throws IOException {
+ String defaultProfFileName = "producer.properties";
+ String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props);
+ return topicConsumerPropFileName;
+ }
+
+ private static String createConsumerProducerPropFile(String topic, String defaultProfFileName, Properties props) throws IOException {
+ ClassLoader classLoader = DmaapUtil.class.getClassLoader();
+ InputStream inputStream = classLoader.getResourceAsStream(defaultProfFileName);
+ Properties defaultProps = new Properties();
+ defaultProps.load(inputStream);
+ defaultProps.setProperty("topic",topic);
+
+ String preferredRouteFileName = DmaapUtil.createPreferredRouteFileIfNotExist(topic);
+ if(props != null && !props.isEmpty()){
+ defaultProps.putAll(props);
+ }
+ defaultProps.setProperty("topic",topic);
+ defaultProps.setProperty("DME2preferredRouterFilePath",preferredRouteFileName);
+ String id = defaultProps.getProperty("id");
+ String topicConsumerPropFileName = defaultProfFileName;
+ topicConsumerPropFileName = id != null ? id+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName;
+ topicConsumerPropFileName = topic != null ? topic+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName;
+
+ defaultProps.store(new FileOutputStream(topicConsumerPropFileName), defaultProfFileName+" file created on the fly for topic:"+topic+" on:"+System.currentTimeMillis());
+ return topicConsumerPropFileName;
+ }
+
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java
index 0f7d40f5b..c671c6fdb 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java
@@ -19,33 +19,33 @@
* ============LICENSE_END=========================================================
*/
-package org.openecomp.appc.adapter.dmaap.impl;
+package org.openecomp.appc.adapter.messaging.dmaap.impl;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.openecomp.sdnc.sli.SvcLogicContext;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import org.openecomp.appc.adapter.dmaap.EventSender;
-import org.openecomp.appc.adapter.dmaap.Producer;
-import org.openecomp.appc.adapter.dmaap.DmaapDestination;
-import org.openecomp.appc.adapter.dmaap.event.EventHeader;
-import org.openecomp.appc.adapter.dmaap.event.EventMessage;
-import org.openecomp.appc.adapter.dmaap.event.EventStatus;
-import org.openecomp.appc.adapter.dmaap.DmaapProducer;
+import org.openecomp.appc.adapter.message.EventSender;
+import org.openecomp.appc.adapter.message.MessageDestination;
+import org.openecomp.appc.adapter.message.Producer;
+import org.openecomp.appc.adapter.message.event.EventHeader;
+import org.openecomp.appc.adapter.message.event.EventMessage;
+import org.openecomp.appc.adapter.message.event.EventStatus;
+import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl;
import org.openecomp.appc.configuration.Configuration;
import org.openecomp.appc.configuration.ConfigurationFactory;
import org.openecomp.appc.exceptions.APPCException;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import org.openecomp.sdnc.sli.SvcLogicContext;
-
-public class EventSenderImpl implements EventSender
+public class EventSenderDmaapImpl implements EventSender
{
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderImpl.class);
- public static final String EVENT_TOPIC_WRITE = "event.topic.write";
- public static final String EVENT_CLIENT_KEY = "event.client.key";
- public static final String EVENT_CLIENT_SECRET = "event.client.secret";
- public static final String EVENT_POOL_MEMBERS = "event.pool.members";
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class);
+ public static final String EVENT_TOPIC_WRITE = "dmaap.event.topic.write";
+ public static final String DMAAP_USERNAME = "dmaap.appc.username";
+ public static final String DMAAP_PASSWORD = "dmaap.appc.password";
+ public static final String EVENT_POOL_MEMBERS = "dmaap.event.pool.members";
private static Configuration configuration = ConfigurationFactory.getConfiguration();
@@ -59,21 +59,21 @@ public class EventSenderImpl implements EventSender
this.producerMap = producerMap;
}
- public EventSenderImpl(){
+ public EventSenderDmaapImpl(){
}
public void initialize(){
Properties properties = configuration.getProperties();
String writeTopic;
- String apiKey;
- String apiSecret;
+ String username;
+ String password;
final List<String> pool = new ArrayList<>();
- for(DmaapDestination destination:DmaapDestination.values()){
+ for(MessageDestination destination: MessageDestination.values()){
writeTopic = properties.getProperty(destination + "." + EVENT_TOPIC_WRITE);
- apiKey = properties.getProperty(destination + "." + EVENT_CLIENT_KEY);
- apiSecret = properties.getProperty(destination + "." + EVENT_CLIENT_SECRET);
+ username = properties.getProperty(destination + "." + DMAAP_USERNAME);
+ password = properties.getProperty(destination + "." + DMAAP_PASSWORD);
String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS);
if (hostNames != null && !hostNames.isEmpty()) {
@@ -83,12 +83,8 @@ public class EventSenderImpl implements EventSender
LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS));
LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE));
- LOG.debug(String.format("apiKey = %s, taken from property: %s", apiKey, destination + "." + EVENT_CLIENT_KEY));
- Producer producer = new DmaapProducer(pool, writeTopic);
-
- if (apiKey != null && apiSecret != null) {
- producer.updateCredentials(apiKey, apiSecret);
- }
+ LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME));
+ Producer producer = new DmaapProducerImpl(pool, writeTopic,username, password);
for (String url : pool) {
if (url.contains("3905") || url.contains("https")) {
@@ -103,7 +99,7 @@ public class EventSenderImpl implements EventSender
}
@Override
- public boolean sendEvent(DmaapDestination destination,EventMessage msg) {
+ public boolean sendEvent(MessageDestination destination, EventMessage msg) {
String jsonStr = msg.toJson();
String id = msg.getEventHeader().getEventId();
LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr));
@@ -112,7 +108,43 @@ public class EventSenderImpl implements EventSender
}
@Override
- public boolean sendEvent(DmaapDestination destination,Map<String, String> params, SvcLogicContext ctx) throws APPCException {
+ public boolean sendEvent(MessageDestination destination, EventMessage msg, String eventTopicName) {
+ String jsonStr = msg.toJson();
+ String id = msg.getEventHeader().getEventId();
+ LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr));
+ Producer producer = createProducer(destination, eventTopicName);
+ return producer.post(id, jsonStr);
+ }
+
+ private Producer createProducer(MessageDestination destination, String eventTopicName) {
+ Properties properties = configuration.getProperties();
+ final List<String> pool = new ArrayList<>();
+ String username = properties.getProperty(destination + "." + DMAAP_USERNAME);
+ String password = properties.getProperty(destination + "." + DMAAP_PASSWORD);
+ String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS);
+
+ if (hostNames != null && !hostNames.isEmpty()) {
+ LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS));
+ Collections.addAll(pool, hostNames.split(","));
+ }
+
+ LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS));
+ LOG.debug(String.format("writeTopic = %s, taken from property: %s", eventTopicName, destination + "." + EVENT_TOPIC_WRITE));
+ LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME));
+ Producer producer = new DmaapProducerImpl(pool, eventTopicName,username, password);
+
+ for (String url : pool) {
+ if (url.contains("3905") || url.contains("https")) {
+ LOG.debug("Producer should use HTTPS");
+ producer.useHttps(true);
+ break;
+ }
+ }
+ return producer;
+ }
+
+ @Override
+ public boolean sendEvent(MessageDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException {
if (params == null) {
String message = "Parameters map is empty (null)";
@@ -134,10 +166,10 @@ public class EventSenderImpl implements EventSender
LOG.error(message);
throw new APPCException(message);
}
- EventMessage dmaapEventMessage = new EventMessage(
+ EventMessage eventMessage = new EventMessage(
new EventHeader(eventTime, apiVer, eventId),
new EventStatus(code, reason));
- return sendEvent(destination,dmaapEventMessage);
+ return sendEvent(destination,eventMessage);
}
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index a8caf666f..eefe8e504 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -24,10 +24,10 @@
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd">
- <bean id="eventSenderBean" class="org.openecomp.appc.adapter.dmaap.impl.EventSenderImpl"
+ <bean id="eventSenderBean" class="org.openecomp.appc.adapter.messaging.dmaap.impl.EventSenderDmaapImpl"
init-method="initialize" scope="singleton">
</bean>
- <service id="eventSenderService" interface="org.openecomp.appc.adapter.dmaap.EventSender" ref="eventSenderBean" />
+ <service id="eventSenderService" interface="org.openecomp.appc.adapter.message.EventSender" ref="eventSenderBean" />
</blueprint>
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties
new file mode 100644
index 000000000..facd33e4d
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties
@@ -0,0 +1,57 @@
+###
+# ============LICENSE_START=======================================================
+# openECOMP : APP-C
+# ================================================================================
+# Copyright (C) 2017 AT&T Intellectual Property. All rights
+# reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+#TransportType-Specify which way user want to use. I.e. <HTTPAAF,DME2,HTTPAUTH >
+TransportType=HTTPAAF
+Latitude =50.000000
+Longitude =-100.000000
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.openecomp.org/events
+Environment =TEST
+Partner=BOT_R
+routeOffer=MR1
+SubContextPath =/
+Protocol =http
+MethodType =GET
+username =admin
+password =admin
+contenttype =application/json
+authKey=01234567890abcde:01234567890abcdefghijklmn
+authDate=2016-02-18T13:57:37-0800
+host=127.0.0.1
+topic=org.openecomp.appc.UNIT-TEST
+group=jmsgrp
+id=2
+timeout=15000
+limit=1000
+filter=
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=preferredRoute.txt
+
+
+
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt
new file mode 100644
index 000000000..662b0aa7d
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt
@@ -0,0 +1 @@
+preferredRouteKey=MR1 \ No newline at end of file
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/producer.properties b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/producer.properties
new file mode 100644
index 000000000..4901d517e
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/producer.properties
@@ -0,0 +1,52 @@
+###
+# ============LICENSE_START=======================================================
+# openECOMP : APP-C
+# ================================================================================
+# Copyright (C) 2017 AT&T Intellectual Property. All rights
+# reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+#TransportType-Specify which way user want to use. I.e. <HTTPAAF,DME2,HTTPAUTH >
+TransportType=HTTPAAF
+Latitude =50.000000
+Longitude =-100.000000
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.openecomp.org/events
+Environment =TEST
+Partner=BOT_R
+SubContextPath =/
+Protocol =http
+MethodType =POST
+username =admin
+password =admin
+contenttype = application/json
+authKey=01234567890abcde:01234567890abcdefghijklmn
+authDate=2016-07-20T11:30:56-0700
+host=127.0.0.1
+topic=org.openecomp.appc.UNIT-TEST
+partition=2
+maxBatchSize=100
+maxAgeMs=250
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=preferredRoute.txt
+MessageSentThreadOccurance=50
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java
new file mode 100644
index 000000000..a0ae92ea8
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java
@@ -0,0 +1,134 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.messaging.dmaap;
+
+import java.io.*;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.att.nsa.mr.client.MRConsumer;
+import org.json.JSONObject;
+import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapUtil;
+
+import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRPublisher.message;
+
+
+/**
+ *An example of how to use the Java publisher.
+ */
+public class SimpleExamplePublisher
+{
+
+ public static void main(String []args) throws InterruptedException, Exception{
+ int msgCount = 1;
+ SimpleExamplePublisher publisher = new SimpleExamplePublisher();
+
+ int i=0;
+
+ String topicProducerPropFileName = DmaapUtil.createProducerPropFile("org.openecomp.appc.UNIT-TEST", null);
+ while (i< msgCount)
+ {
+ publisher.publishMessage(topicProducerPropFileName,i);
+ i++;
+ }
+
+ fetchMessage();
+ }
+
+
+ public void publishMessage( String producerFilePath,int count ) throws IOException, InterruptedException, Exception
+ {
+ // create our publisher
+ final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath);
+ // publish some messages
+ final JSONObject msg1 = new JSONObject ();
+ msg1.put ( "Partition:2", "Message:" +count);
+ //msg1.put ( "greeting", "Hello .." );
+
+ pub.send ( "2", msg1.toString());
+ // close the publisher to make sure everything's sent before exiting. The batching
+ // publisher interface allows the app to get the set of unsent messages. It could
+ // write them to disk, for example, to try to send them later.
+ final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS );
+ if ( stuck.size () > 0 )
+ {
+ System.err.println ( stuck.size() + " messages unsent" );
+ }
+ else
+ {
+ System.out.println ( "Clean exit; all messages sent." );
+ }
+ }
+
+
+ public static void fetchMessage()
+ {
+ int count = 0;
+
+ try
+ {
+ String topic = "org.openecomp.appc.UNIT-TEST";
+ Properties props = new Properties();
+ props.put("id", "1");
+ props.put("group", "group1");
+ String topicConsumerPropFileName1 = DmaapUtil.createConsumerPropFile(topic,props);
+ final MRConsumer consumer1 = MRClientFactory.createConsumer ( topicConsumerPropFileName1);
+
+ props = new Properties();
+ props.put("id", "2");
+ props.put("group", "group2");
+ String topicConsumerPropFileName2 = DmaapUtil.createConsumerPropFile(topic,props);
+ final MRConsumer consumer2 = MRClientFactory.createConsumer ( topicConsumerPropFileName2);
+
+ for ( String msg : consumer1.fetch () )
+ {
+ count++;
+ System.out.println ( "consumer1 "+count + ": " + msg );
+ }
+ for ( String msg : consumer2.fetch () )
+ {
+ count++;
+ System.out.println ( "consumer1 "+count + ": " + msg );
+ }
+
+
+ }
+ catch ( Exception x )
+ {
+ System.out.println("inside cons exc");
+ System.err.println ( x.getClass().getName () + ": " + x.getMessage () );
+ }
+ }
+}
+
+
+
+
+
+
+
+
+
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/dmaap/TestAppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java
index e97014198..6626938a7 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/dmaap/TestAppcDmaapAdapterActivator.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java
@@ -19,13 +19,13 @@
* ============LICENSE_END=========================================================
*/
-package org.openecomp.appc.adapter.dmaap;
+package org.openecomp.appc.adapter.messaging.dmaap;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import org.junit.Test;
-import org.openecomp.appc.adapter.dmaap.AppcDmaapAdapterActivator;
+import org.openecomp.appc.adapter.messaging.dmaap.AppcDmaapAdapterActivator;
public class TestAppcDmaapAdapterActivator {
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapConsuming.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapConsuming.java
new file mode 100644
index 000000000..23647d61a
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapConsuming.java
@@ -0,0 +1,84 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.messaging.dmaap;
+
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.openecomp.appc.adapter.message.Consumer;
+import org.openecomp.appc.adapter.messaging.dmaap.http.HttpDmaapConsumerImpl;
+import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapConsumerImpl;
+import org.openecomp.appc.configuration.Configuration;
+import org.openecomp.appc.configuration.ConfigurationFactory;
+import org.junit.Ignore;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Must have a DMaaP cluster or simulator up and running
+ * Update the hostname, topic, client properties in
+ * resources/org/openecomp/appc/default.properties
+ *
+ */
+public class TestDmaapConsuming {
+
+ private static Consumer dmaapConsumer;
+ private static Consumer httpConsumer;
+
+ @BeforeClass
+ public static void setUp() {
+
+ Configuration configuration = ConfigurationFactory.getConfiguration();
+
+ List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(","));
+ String topic = configuration.getProperty("topic.read");
+ String consumerName = configuration.getProperty("client.name");
+ String consumerId = configuration.getProperty("client.name.id");
+ String msgFilter = configuration.getProperty("message.filter");
+ String user = configuration.getProperty("dmaap.appc.username");
+ String password = configuration.getProperty("dmaap.appc.password");
+
+ httpConsumer = new HttpDmaapConsumerImpl(hosts, topic, consumerName, consumerId, msgFilter);
+ dmaapConsumer = new DmaapConsumerImpl(hosts, topic, consumerName, consumerId,user,password,msgFilter);
+ }
+
+ @Test
+ @Ignore
+ public void testHttpFetchMessages() {
+ testFetchMessages(httpConsumer);
+ }
+
+ @Test
+ @Ignore
+ public void testFetchMessages() {
+ testFetchMessages(dmaapConsumer);
+ }
+
+ private void testFetchMessages(Consumer consumer) {
+ List<String> messages = consumer.fetch(1000, 100);
+ Assert.assertNotNull(messages);
+ Assert.assertFalse(messages.isEmpty());
+ }
+
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapEventSender.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapEventSender.java
new file mode 100644
index 000000000..b35f9871d
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapEventSender.java
@@ -0,0 +1,169 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.messaging.dmaap;
+
+import org.openecomp.sdnc.sli.SvcLogicContext;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.openecomp.appc.adapter.message.MessageDestination;
+import org.openecomp.appc.adapter.message.Producer;
+import org.openecomp.appc.adapter.message.event.EventHeader;
+import org.openecomp.appc.adapter.message.event.EventMessage;
+import org.openecomp.appc.adapter.message.event.EventStatus;
+import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl;
+import org.openecomp.appc.adapter.messaging.dmaap.impl.EventSenderDmaapImpl;
+import org.openecomp.appc.configuration.Configuration;
+import org.openecomp.appc.configuration.ConfigurationFactory;
+import org.openecomp.appc.exceptions.APPCException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+
+public class TestDmaapEventSender {
+
+ private static Properties props;
+ private static Map<String,Producer> producerMap = new HashMap<>();
+ private static EventMessage eventMessage;
+
+ @BeforeClass
+ public static void setUp() {
+
+ Configuration configuration = ConfigurationFactory.getConfiguration(); // test.properties file placed in home dir.
+
+ props = new Properties();
+ props.setProperty(EventSenderDmaapImpl.EVENT_POOL_MEMBERS,
+ configuration.getProperty(EventSenderDmaapImpl.EVENT_POOL_MEMBERS) != null ?
+ configuration.getProperty(EventSenderDmaapImpl.EVENT_POOL_MEMBERS) : "member1,member2,member3");
+ props.setProperty(EventSenderDmaapImpl.EVENT_TOPIC_WRITE,
+ configuration.getProperty(EventSenderDmaapImpl.EVENT_TOPIC_WRITE) != null ?
+ configuration.getProperty(EventSenderDmaapImpl.EVENT_TOPIC_WRITE) : "topic1");
+
+ String eventClientKey = configuration.getProperty(EventSenderDmaapImpl.DMAAP_USERNAME);
+ if (eventClientKey != null) {
+ props.setProperty(EventSenderDmaapImpl.DMAAP_USERNAME,eventClientKey);
+ }
+ String eventClientSecret = configuration.getProperty(EventSenderDmaapImpl.DMAAP_PASSWORD);
+ if (eventClientSecret != null) {
+ props.setProperty(EventSenderDmaapImpl.DMAAP_PASSWORD, eventClientSecret);
+ }
+
+ Producer producer = Mockito.mock(DmaapProducerImpl.class);
+ producerMap.put(MessageDestination.DCAE.toString(),producer);
+ Mockito.when(producer.post(Matchers.anyString(), Matchers.anyString())).thenReturn(true);
+
+ eventMessage = new EventMessage(
+ new EventHeader("2016-03-15T10:59:33.79Z", "1.01", "17"),
+ new EventStatus(404, "No krokodil found"));
+ }
+
+ @Test
+ @Ignore // requires connection to a live DMaaP server
+ public void testDmaapEventSenderWithProperties() {
+ EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl();
+ eventSender.initialize();
+ eventSender.setProducerMap(producerMap);
+ Assert.assertTrue(eventSender.sendEvent(MessageDestination.DCAE, eventMessage));
+ }
+
+ @Test
+ public void testDmaapEventSenderWithNullProperties() {
+ EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl();
+// eventSender.initialize();
+ eventSender.setProducerMap(producerMap);
+ Assert.assertTrue(eventSender.sendEvent(MessageDestination.DCAE, eventMessage));
+ }
+
+ /*
+ * This test runs agains a real Dmaap (or a simulator) that should be cofigured in test.properties file.
+ */
+ @Test
+ @Ignore // requires connection to a live DMaaP server
+ public void testDmaapEventSenderWithDmaapSim() {
+ EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl();
+ eventSender.initialize();
+ Assert.assertTrue(eventSender.sendEvent(MessageDestination.DCAE, eventMessage));
+ }
+
+
+ @Test
+ @Ignore // requires connection to a live DMaaP server
+ public void testDmaapEventSenderDG() throws APPCException {
+ EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl();
+ eventSender.initialize();
+ eventSender.setProducerMap(producerMap);
+ Map<String,String> params = new HashMap<>();
+
+ params.put("eventTime", eventMessage.getEventHeader().getEventTime());
+ params.put("apiVer", eventMessage.getEventHeader().getApiVer());
+ params.put("eventId", eventMessage.getEventHeader().getEventId());
+ params.put("reason", eventMessage.getEventStatus().getReason());
+ params.put("code", "200");
+
+ Assert.assertTrue(eventSender.sendEvent(MessageDestination.DCAE,params, new SvcLogicContext()));
+ }
+
+ @Test(expected = APPCException.class)
+ @Ignore // requires connection to a live DMaaP server
+ public void testDmaapEventSenderDGNoParams() throws APPCException {
+ EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl();
+ eventSender.initialize();
+ eventSender.setProducerMap(producerMap);
+ Map<String,String> params = new HashMap<>();
+
+ Assert.assertFalse(eventSender.sendEvent(MessageDestination.DCAE,params, new SvcLogicContext()));
+ }
+
+
+ @Test(expected = APPCException.class)
+ @Ignore // requires connection to a live DMaaP server
+ public void testDmaapEventSenderDGNullParam() throws APPCException {
+ EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl();
+ eventSender.initialize();
+ eventSender.setProducerMap(producerMap);
+ Map<String,String> params = null;
+
+ Assert.assertFalse(eventSender.sendEvent(MessageDestination.DCAE,params, new SvcLogicContext()));
+ }
+
+ @Test(expected = APPCException.class)
+ @Ignore // requires connection to a live DMaaP server
+ public void testDmaapEventSenderDGNoParam() throws APPCException {
+ EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl();
+ eventSender.initialize();
+ eventSender.setProducerMap(producerMap);
+ Map<String,String> params = new HashMap<>();
+
+// params.put("apiVer", eventMessage.getEventHeader().getApiVer());
+ params.put("eventId", eventMessage.getEventHeader().getEventId());
+ params.put("reason", eventMessage.getEventStatus().getReason());
+ params.put("code", "200");
+
+ eventSender.sendEvent(MessageDestination.DCAE,params, new SvcLogicContext());
+ }
+
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapProducing.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapProducing.java
new file mode 100644
index 000000000..b5b7c9538
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapProducing.java
@@ -0,0 +1,80 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.messaging.dmaap;
+
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.openecomp.appc.adapter.message.Producer;
+import org.openecomp.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl;
+import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl;
+import org.openecomp.appc.configuration.Configuration;
+import org.openecomp.appc.configuration.ConfigurationFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Must have a DMaaP cluster or simulator up and running
+ * Update the hostname, topic, client properties in
+ * resources/org/openecomp/appc/default.properties
+ *
+ */
+public class TestDmaapProducing {
+
+ private static Producer httpProducer;
+ private static Producer dmaapProducer;
+
+ @BeforeClass
+ public static void setUp() {
+
+ Configuration configuration = ConfigurationFactory.getConfiguration();
+
+ List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(","));
+ String topic = configuration.getProperty("topic.write");
+ String user = configuration.getProperty("dmaap.appc.username");
+ String password = configuration.getProperty("dmaap.appc.password");
+
+ dmaapProducer = new DmaapProducerImpl(hosts, topic,user,password);
+ httpProducer = new HttpDmaapProducerImpl(hosts, topic);
+ httpProducer.updateCredentials(user,password);
+ }
+
+ @Test
+ @Ignore
+ public void testHttpPostMessage() {
+ testPostMessage(httpProducer);
+ }
+
+ @Test
+ @Ignore
+ public void testPostMessages() {
+ testPostMessage(dmaapProducer);
+ }
+
+ private void testPostMessage(Producer producer) {
+ Assert.assertTrue(producer.post("partition", "{\"message\": \"Hello, world!\"}"));
+ }
+
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java
new file mode 100644
index 000000000..4b936351a
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java
@@ -0,0 +1,242 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.messaging.dmaap.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.*;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.openecomp.appc.adapter.message.Consumer;
+import org.openecomp.appc.adapter.message.Producer;
+import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapConsumerImpl;
+import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl;
+import org.openecomp.appc.configuration.Configuration;
+import org.openecomp.appc.configuration.ConfigurationFactory;
+
+public class TestConsumerProducerImpl {
+
+ private Collection<String> urls;
+ private String topicRead;
+ private String topicWrite;
+ private String group;
+ private String groupId;
+ private String user;
+ private String password;
+
+ @Before
+ public void setup() {
+ System.out.println("setup entry...");
+// urls = new HashSet<String>();
+// urls.add("dmaaphost1");
+// urls.add("dmaaphost2");
+// //remove unavailable dmaap instance for build
+// //urls.add("dmaaphost3");
+//
+// topicRead = "APPC-UNIT-TEST";
+// topicWrite = "APPC-UNIT-TEST";
+// group = "APPC-CLIENT";
+// groupId = "0";
+ Configuration configuration = ConfigurationFactory.getConfiguration();
+ List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(","));
+ urls = new HashSet<String>(hosts);
+ topicRead = configuration.getProperty("topic.read");
+ topicWrite = configuration.getProperty("topic.write");
+ user = configuration.getProperty("dmaap.appc.username");
+ password = configuration.getProperty("dmaap.appc.password");
+ group = "APPC-CLIENT";
+ groupId = "0";
+
+
+ runoff();
+ }
+
+ /**
+ * Test that we can read and write and that the messages come back in order
+ */
+ @Ignore
+ @Test
+ public void testWriteRead() {
+ System.out.println("testWriteRead entry...");
+ Producer p = new DmaapProducerImpl(urls, topicWrite,user,password);
+
+ String s1 = UUID.randomUUID().toString();
+ String s2 = UUID.randomUUID().toString();
+ if (p.post("TEST", s1) == false) {
+ // try again - 2nd attempt may succeed if cambria client failed over
+ p.post("TEST", s1);
+ }
+ if (p.post("TEST", s2) == false) {
+ // try again - 2nd attempt may succeed if cambria client failed over
+ p.post("TEST", s2);
+ }
+
+ Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password);
+ List<String> out = c.fetch();
+ // if fetch is empty, try again - a 2nd attempt may succeed if
+ // cambria client has failed over
+ if ((out == null) || out.isEmpty()) {
+ out = c.fetch();
+ }
+
+ assertNotNull(out);
+ assertEquals(2, out.size());
+ assertEquals(s1, out.get(0));
+ assertEquals(s2, out.get(1));
+
+ }
+
+ /**
+ * Test that we can read and write and that the messages come back in order
+ */
+ @Test
+ @Ignore // Https Not support on jenkins server
+ public void testWriteReadHttps() {
+ System.out.println("testWriteReadHttps entry...");
+ Producer p = new DmaapProducerImpl(urls, topicWrite,user,password);
+ p.useHttps(true);
+
+ String s1 = UUID.randomUUID().toString();
+ String s2 = UUID.randomUUID().toString();
+ if (p.post("TEST", s1) == false) {
+ // try again - 2nd attempt may succeed if cambria client failed over
+ p.post("TEST", s1);
+ }
+ if (p.post("TEST", s2) == false) {
+ // try again - 2nd attempt may succeed if cambria client failed over
+ p.post("TEST", s2);
+ }
+
+ Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password);
+ c.useHttps(true);
+
+ List<String> out = c.fetch();
+ // if fetch is empty, try again - a 2nd attempt may succeed if
+ // cambria client has failed over
+ if ((out == null) || out.isEmpty()) {
+ out = c.fetch();
+ }
+
+ assertNotNull(out);
+ assertEquals(2, out.size());
+ assertEquals(s1, out.get(0));
+ assertEquals(s2, out.get(1));
+
+ }
+
+ @Test
+ @Ignore // requires connection to a live DMaaP server
+ public void testBadUrl() {
+ System.out.println("testBadUrl entry...");
+ urls.clear();
+ urls.add("something.local");
+
+ // Producer p = new DmaapProducerImpl(urls, topicWrite);
+ Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password);
+ List<String> result = c.fetch(1000, 1000);
+
+ assertNotNull(result);
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ @Ignore // requires connection to a live DMaaP server
+ public void testAuth() {
+ System.out.println("testAuth entry...");
+ Producer p = new DmaapProducerImpl(urls, topicWrite,user,password);
+ Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password);
+
+ p.updateCredentials("key", "secret");
+ c.updateCredentials("key", "secret");
+
+ // TODO - Do some protected dmaap queries when the apis are updated
+ }
+
+ /**
+ * Test DMaaP client failover to another server when a bad url is encountered
+
+ */
+ @Ignore
+ @Test
+ public void testFailover() {
+ System.out.println("testFailover entry...");
+ urls.clear();
+ urls.add("openecomp2.org"); // bad url
+ urls.add("dmaaphost2");
+ Producer p = new DmaapProducerImpl(urls, topicWrite,user,password);
+
+ String s1 = UUID.randomUUID().toString();
+ if (p.post("TEST", s1) == false) {
+ // try again - cambria client should have failed over
+ p.post("TEST", s1);
+ }
+
+ urls.clear();
+ urls.add("openecomp3.org"); // bad url
+ urls.add("dmaaphost3");
+
+ Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password);
+ List<String> out = c.fetch(1000, 1000);
+ // if fetch is empty, try again - cambria client should have failed over
+ if ((out == null) || out.isEmpty()) {
+ out = c.fetch();
+ }
+
+ assertNotNull(out);
+ assertEquals(1, out.size());
+ assertEquals(s1, out.get(0));
+ }
+
+ /**
+ * Reads through the entire topic so it is clean for testing. WARNING - ONLY USE ON TOPICS WHERE YOU ARE THE ONLY
+ * WRITER. Could end in an infinite loop otherwise.
+ */
+ private void runoff() {
+ Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password);
+ List<String> data;
+ do {
+ data = c.fetch(1000, 10000);
+ } while (!data.isEmpty() && data.size()!=1);
+ }
+
+ @Test
+ @Ignore
+ public void testFilter() {
+ System.out.println("testFilter entry...");
+ List<String> res;
+ String filter = "{\"class\":\"Assigned\",\"field\":\"request\"}";
+ Consumer c = new DmaapConsumerImpl(urls, "DCAE-CLOSED-LOOP-EVENTS-DEV1510SIM", group, groupId,user,password,filter);
+ res = c.fetch(2000, 10);
+ assertFalse(res.isEmpty());
+
+ res.clear();
+ filter = "{\"class\":\"Assigned\",\"field\":\"response\"}";
+ c = new DmaapConsumerImpl(urls, "DCAE-CLOSED-LOOP-EVENTS-DEV1510SIM", group, groupId,user,password, filter);
+ res = c.fetch(2000, 10);
+ assertTrue(res.isEmpty());
+ }
+}