diff options
author | Patrick Brady <pb071s@att.com> | 2017-06-01 10:45:37 -0700 |
---|---|---|
committer | Patrick Brady <pb071s@att.com> | 2017-06-02 13:05:15 -0700 |
commit | c7d0075d223eab9f89fd28853c4b138792059be9 (patch) | |
tree | 40aa3e41e598ea7a59bcf6899a2004c1abab11c2 /appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java | |
parent | 8aac2df744820304ee29354333661699e9695939 (diff) |
Merge of new rebased code
Change-Id: I9b8d1f69eb3e0af1935ed8304fea4bf54c1aac47
Signed-off-by: Patrick Brady <pb071s@att.com>
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java')
17 files changed, 627 insertions, 555 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java deleted file mode 100644 index 7c282911d..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java +++ /dev/null @@ -1,58 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap; - -import java.util.List; -import java.util.concurrent.Callable; - -public class CallableConsumer implements Callable<List<String>> { - - private Consumer consumer; - - private int timeout = 15000; - private int limit = 1000; - - public CallableConsumer(Consumer c) { - this.consumer = c; - } - - public CallableConsumer(Consumer c, int waitMs, int fetchSize) { - this.consumer = c; - this.timeout = waitMs; - this.limit = fetchSize; - } - - @Override - public List<String> call() { - return consumer.fetch(timeout, limit); - } - - /** - * The maximum amount of time to keep a connection alive. Currently is set to waitMs + 10s - * - * @return An integer representing the maximum amount of time to keep this thread alive - */ - public int getMaxLife() { - return 10000 + timeout; - } - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java deleted file mode 100644 index 32034e5fb..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java +++ /dev/null @@ -1,68 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap; - -import java.util.List; - -public interface Consumer { - - /** - * Gets a batch of messages from the topic. Defaults to 1000 messages with 15s wait for messages if empty. - * - * @return A list of strings representing the messages pulled from the topic. - */ - public List<String> fetch(); - - /** - * Gets a batch of messages from the topic. - * - * @param waitMs - * The amount of time to wait in milliseconds if the topic is empty for data to be written. Should be no - * less than 15000ms to prevent too many requests - * @param limit - * The amount of messages to fetch - * @return A list of strings representing the messages pulled from the topic. - */ - public List<String> fetch(int waitMs, int limit); - - /** - * Updates the api credentials for making authenticated requests - * - * @param apiKey - * The public key to authenticate with - * @param apiSecret - * The secret key to authenticate with - */ - public void updateCredentials(String apiKey, String apiSecret); - - // TODO - Implement once Cambria allows you to set outside of constructor - // public void setFilter(String filter); - - /** - * Creates a dmaap client using a https connection - * - * @param yes - * True if https should be used, false otherwise - */ - public void useHttps(boolean yes); - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java deleted file mode 100644 index efbe194ba..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java +++ /dev/null @@ -1,26 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap; - -public enum DmaapDestination { - DCAE -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java deleted file mode 100644 index 7d4a7c090..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java +++ /dev/null @@ -1,35 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap; - -import java.util.Map; - -import org.openecomp.appc.adapter.dmaap.event.EventMessage; -import org.openecomp.appc.exceptions.APPCException; -import org.openecomp.sdnc.sli.SvcLogicContext; -import org.openecomp.sdnc.sli.SvcLogicJavaPlugin; - - -public interface EventSender extends SvcLogicJavaPlugin{ - boolean sendEvent(DmaapDestination destination, EventMessage msg); - boolean sendEvent(DmaapDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException; -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java deleted file mode 100644 index 183e618ba..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java +++ /dev/null @@ -1,45 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap; - -import java.util.Set; - -public interface Manager { - - /** - * Updates the api credentials for making authenticated requests - * - * @param apiKey - * The public key to authenticate with - * @param apiSecret - * The secret key to authenticate with - */ - public void updateCredentials(String apiKey, String apiSecret); - - /** - * Return a set of strings representing topics that the user can see - * - * @return A set of strings with topic names or an empty set if no topics are visible - */ - public Set<String> getTopics(); - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java deleted file mode 100644 index f19c516be..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java +++ /dev/null @@ -1,46 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap; - -public interface Producer { - - public boolean post(String partition, String data); - - /** - * Updates the api credentials for making authenticated requests - * - * @param apiKey - * The public key to authenticate with - * @param apiSecret - * The secret key to authenticate with - */ - public void updateCredentials(String apiKey, String apiSecret); - - /** - * Creates a dmaap client using a https connection - * - * @param yes - * True if https should be used, false otherwise - */ - public void useHttps(boolean yes); - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java deleted file mode 100644 index dd951fe37..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java +++ /dev/null @@ -1,64 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap.event; - -import com.fasterxml.jackson.annotation.JsonProperty; - - -public class EventHeader { - - @JsonProperty("eventTime") - private final String eventTime; - - @JsonProperty("apiVer") - private final String apiVer; - - @JsonProperty("eventId") - private final String eventId; - - public EventHeader(String eventTime, String apiVer, String eventId) { - this.eventTime = eventTime; - this.apiVer = apiVer; - this.eventId = eventId; - } - - public String getEventTime() { - return eventTime; - } - - public String getApiVer() { - return apiVer; - } - - public String getEventId() { - return eventId; - } - - @Override - public String toString() { - return "EventHeader{" + - "eventTime='" + eventTime + '\'' + - ", apiVer='" + apiVer + '\'' + - ", eventId='" + eventId + '\'' + - '}'; - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java deleted file mode 100644 index af5cff2f9..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java +++ /dev/null @@ -1,99 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap.event; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize.Inclusion; - -import java.io.IOException; -import java.io.Serializable; - -/* - { - "EventHeader": { - "eventTime": "2016-03-15T10:59:33.79Z", - "apiVer": "1.01", - "EventId": "<ECOMP_EVENT_ID>", - }, - "EventStatus": { - "code": "NNN", - "reason": "A reason" - } - } -*/ - - - - -@JsonSerialize(include = Inclusion.NON_NULL) -@JsonIgnoreProperties(ignoreUnknown = true) -public class EventMessage implements Serializable { - - private static final long serialVersionUID = 1L; - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - @JsonProperty("eventHeader") - private EventHeader eventHeader; - @JsonProperty("eventStatus") - private EventStatus eventStatus; - - public EventMessage(EventHeader eventHeader, EventStatus eventStatus) { - this.eventHeader = eventHeader; - this.eventStatus = eventStatus; - } - - public EventHeader getEventHeader() { - return eventHeader; - } - - public void setEventHeader(EventHeader eventHeader) { - this.eventHeader = eventHeader; - } - - public EventStatus getEventStatus() { - return eventStatus; - } - - public void setEventStatus(EventStatus eventStatus) { - this.eventStatus = eventStatus; - } - - public String toJson() { - try { - return OBJECT_MAPPER.writeValueAsString(this); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public String toString() { - return "EventMessage{" + - "eventHeader=" + eventHeader + - ", eventStatus=" + eventStatus + - '}'; - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java deleted file mode 100644 index f5d7a59d4..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java +++ /dev/null @@ -1,56 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : APP-C - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.adapter.dmaap.event; - -import com.fasterxml.jackson.annotation.JsonProperty; - - -public class EventStatus { - - @JsonProperty("code") - private final Integer code; - - @JsonProperty("reason") - private final String reason; - - public EventStatus(Integer code, String aReason) { - this.code = code; - reason = aReason; - } - - - public Integer getCode() { - return code; - } - - public String getReason() { - return reason; - } - - @Override - public String toString() { - return "EventStatus{" + - "code=" + code + - ", reason='" + reason + '\'' + - '}'; - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java index c02553dfe..c7be330cf 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java @@ -19,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.messaging.dmaap; import org.openecomp.appc.configuration.ConfigurationFactory; import com.att.eelf.configuration.EELFLogger; diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java index 654ec6f7f..0d0450681 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java @@ -19,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.messaging.dmaap.http; import java.net.URI; diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java index 6e16d896b..2145eaa70 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java @@ -19,14 +19,13 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.messaging.dmaap.http; import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.List; - import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -38,10 +37,11 @@ import org.apache.http.client.utils.URIBuilder; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.json.JSONArray; +import org.openecomp.appc.adapter.message.Consumer; -public class DmaapConsumer extends CommonHttpClient implements Consumer { +public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumer.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class); // Default values private static final int DEFAULT_TIMEOUT_MS = 15000; @@ -54,17 +54,17 @@ public class DmaapConsumer extends CommonHttpClient implements Consumer { private boolean useHttps = false; - public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId) { + public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId) { this(hosts, topicName, consumerName, consumerId, null); } - public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId, - String filter) { + public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, + String filter) { this(hosts, topicName, consumerName, consumerId, filter, null, null); } - public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId, - String filter, String user, String password) { + public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, + String filter, String user, String password) { urls = new ArrayList<String>(); for (String host : hosts) { urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId)); @@ -155,9 +155,10 @@ public class DmaapConsumer extends CommonHttpClient implements Consumer { LOG.error("Interrupted while sleeping"); } } - - public void close(){ - //not used yet - } + + @Override + public void close() { + // Nothing to do + } } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java index 6845177b1..85e446dca 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java @@ -19,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.messaging.dmaap.http; import java.net.URI; import java.util.ArrayList; @@ -34,10 +34,11 @@ import com.att.eelf.configuration.EELFManager; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; +import org.openecomp.appc.adapter.message.Producer; -public class DmaapProducer extends CommonHttpClient implements Producer { +public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducer.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class); private static final String CONTENT_TYPE = "application/cambria"; private static final String URL_TEMPLATE = "%s/events/%s"; @@ -47,7 +48,7 @@ public class DmaapProducer extends CommonHttpClient implements Producer { private boolean useHttps = false; - public DmaapProducer(Collection<String> urls, String topicName) { + public HttpDmaapProducerImpl(Collection<String> urls, String topicName) { hosts = new ArrayList<String>(); topics = new HashSet<String>(); topics.add(topicName); @@ -57,7 +58,7 @@ public class DmaapProducer extends CommonHttpClient implements Producer { } } - public DmaapProducer(Collection<String> urls, Set<String> topicNames) { + public HttpDmaapProducerImpl(Collection<String> urls, Set<String> topicNames) { hosts = new ArrayList<String>(); topics = topicNames; @@ -126,8 +127,9 @@ public class DmaapProducer extends CommonHttpClient implements Producer { String m = (msg == null) ? "" : msg; return String.format("%d.%d.%s%s", p.length(), m.length(), p, m); } - - public void close(){ - //not used yet - } + + @Override + public void close() { + // Nothing to do + } } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java new file mode 100644 index 000000000..342d52448 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java @@ -0,0 +1,231 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap.impl; + +import java.io.IOException; +import java.util.*; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +//import com.att.nsa.cambria.client.CambriaClientBuilders; +//import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; +//import com.att.nsa.cambria.client.CambriaConsumer; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import org.apache.commons.lang3.StringUtils; +import org.openecomp.appc.adapter.message.Consumer; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; +import org.openecomp.appc.metricservice.MetricRegistry; +import org.openecomp.appc.metricservice.MetricService; +import org.openecomp.appc.metricservice.impl.MetricServiceImpl; +import org.openecomp.appc.metricservice.metric.Metric; +import org.openecomp.appc.metricservice.metric.MetricType; +import org.openecomp.appc.metricservice.metric.DmaapRequestCounterMetric; +import org.openecomp.appc.metricservice.policy.PublishingPolicy; +import org.openecomp.appc.metricservice.publisher.LogPublisher; +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.ServiceReference; + +public class DmaapConsumerImpl implements Consumer { + + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class); + private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + // Default values + private static final int DEFAULT_TIMEOUT_MS = 60000; + private static final int DEFAULT_LIMIT = 1000; + private static MetricRegistry metricRegistry; + private String topic; + private DmaapRequestCounterMetric dmaapKpiMetric; + private boolean isMetricEnabled=false; + private boolean useHttps = false; + private MRConsumer client = null; + private Properties props = null; + + + public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password) { + this(urls, topicName, consumerGroupName, consumerId,user, password,null); + + } + + public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password,String filter) { + this.topic = topicName; + this.props = new Properties(); + String urlsStr = StringUtils.join(urls, ','); + props.setProperty("host",urlsStr); + props.setProperty("group",consumerGroupName); + props.setProperty("id",consumerId); + props.setProperty("username",user); + props.setProperty("password",password); + if(filter != null) { + props.setProperty("filter", filter); + } + } + + + private void initMetric() { + LOG.debug("Metric getting initialized"); + MetricService metricService = getMetricservice(); + metricRegistry = metricService.createRegistry("APPC"); + dmaapKpiMetric = metricRegistry.metricBuilderFactory(). + dmaapRequestCounterBuilder(). + withName("DMAAP_KPI").withType(MetricType.COUNTER). + withRecievedMessage(0) + .withPublishedMessage(0) + .build(); + if (metricRegistry.register(dmaapKpiMetric)) { + Metric[] metrics = new Metric[]{dmaapKpiMetric}; + LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); + LogPublisher[] logPublishers = new LogPublisher[1]; + logPublishers[0] = logPublisher; + PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory(). + scheduledPolicyBuilder().withPublishers(logPublishers). + withMetrics(metrics). + build(); + LOG.debug("Policy getting initialized"); + manuallyScheduledPublishingPolicy.init(); + LOG.debug("Metric initialized"); + } + } + private MRConsumer getClient() { + return getClient(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); + } + + /** + * @return An instance of MRConsumer created from our class variables + */ + private synchronized MRConsumer getClient(int waitMs, int limit) { + try { + props.setProperty("timeout",String.valueOf(waitMs)); + props.setProperty("limit",String.valueOf(limit)); + String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props); + return MRClientFactory.createConsumer ( topicProducerPropFileName); + } catch (IOException e1) { + LOG.error("failed to createConsumer",e1); + return null; + } + } + + @Override + public synchronized void updateCredentials(String key, String secret) { + LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); + String user = key; + String password = secret; + props.setProperty("user",String.valueOf(user)); + props.setProperty("password",String.valueOf(password)); + client = null; + } + + @Override + public List<String> fetch(int waitMs, int limit) { + Properties properties=configuration.getProperties(); + if(properties!=null && properties.getProperty("metric.enabled")!=null ){ + isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled")); + } + if(isMetricEnabled){ + initMetric(); + } + LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); + List<String> out = new ArrayList<String>(); + + // Create client once and reuse it on subsequent fetches. This is + // to support failover to other servers in the DMaaP cluster. + if (client == null) { + LOG.info("Getting DMaaP Client ..."); + client = getClient(waitMs, limit); + } + try { + for (String s : client.fetch(waitMs, limit)) { + out.add(s); + if(isMetricEnabled){ + ((DmaapRequestCounterMetric)metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage(); + } + } + LOG.debug(String.format("Got %d records from %s", out.size(), this.toString())); + } catch (Exception e) { + // Connection exception + LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage())); + e.printStackTrace(); + try { + LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs)); + Thread.sleep(waitMs); + } catch (InterruptedException e2) { + LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs)); + } + } + + + return out; + } + + /** + * Close consumer Dmaap client + */ + @Override + public void close() { + LOG.debug("Closing Dmaap consumer client...."); + if (client != null) { + client.close(); + } + } + + @Override + public List<String> fetch() { + return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); + } + + @Override + public String toString() { + String hostStr = (props == null || props.getProperty("host") == null? "N/A" : props.getProperty("host")); + String group = (props == null || props.getProperty("group") == null? "N/A" : props.getProperty("group")); + String id = (props == null || props.getProperty("id") == null? "N/A" : props.getProperty("id")); + return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr); + } + + @Override + public void useHttps(boolean yes) { + useHttps = yes; + } + + + private MetricService getMetricservice() { + BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); + // Get AAIadapter reference + ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); + if (sref != null) { + LOG.info("Metric Service from bundlecontext"); + return (MetricServiceImpl) bctx.getService(sref); + + } else { + LOG.info("Metric Service error from bundlecontext"); + LOG.warn("Cannot find service reference for org.openecomp.appc.metricservice.MetricService"); + return null; + + } + } + + public Metric getMetric(String name){ + return metricRegistry.metric(name); + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java new file mode 100644 index 000000000..79d6b3db7 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java @@ -0,0 +1,220 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap.impl; + +import java.io.*; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +//import com.att.nsa.cambria.client.CambriaBatchingPublisher; +//import com.att.nsa.cambria.client.CambriaClientBuilders; +//import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import org.apache.commons.lang3.StringUtils; +import org.openecomp.appc.adapter.message.Producer; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapUtil; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; +import org.openecomp.appc.metricservice.MetricRegistry; +import org.openecomp.appc.metricservice.MetricService; +import org.openecomp.appc.metricservice.metric.Metric; +import org.openecomp.appc.metricservice.metric.MetricType; +import org.openecomp.appc.metricservice.metric.DmaapRequestCounterMetric; +import org.openecomp.appc.metricservice.policy.PublishingPolicy; +import org.openecomp.appc.metricservice.publisher.LogPublisher; +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.ServiceReference; + +public class DmaapProducerImpl implements Producer { + + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class); + private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + + private Set<String> topics = new HashSet<String>(); + + private Properties props = null; + private static MetricRegistry metricRegistry; + private boolean useHttps = false; + private DmaapRequestCounterMetric dmaapKpiMetric; + private boolean isMetricEnabled=false; + + private Set<MRBatchingPublisher> clients; + + + public DmaapProducerImpl(Collection<String> urls, String topicName, String user, String password) { + this(urls, (Set<String>)null, user, password); + this.topics = new HashSet<>(); + if (topicName != null) { + for (String topic : topicName.split(",")) { + topics.add(topic); + } + } + } + + public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) { + topics = topicNames; + if(urls == null || user == null || password == null){ + throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password" ); + } + this.props = new Properties(); + String urlsStr = StringUtils.join(urls, ','); + props.setProperty("host",urlsStr); + props.setProperty("id", UUID.randomUUID().toString()); + props.setProperty("username",user); + props.setProperty("password",password); + } + private void initMetric() { + LOG.debug("Metric getting initialized"); + MetricService metricService = getMetricservice(); + metricRegistry=metricService.createRegistry("APPC"); + dmaapKpiMetric = metricRegistry.metricBuilderFactory(). + dmaapRequestCounterBuilder(). + withName("DMAAP_KPI").withType(MetricType.COUNTER). + withRecievedMessage(0) + .withPublishedMessage(0) + .build(); + if(metricRegistry.register(dmaapKpiMetric)) { + Metric[] metrics = new Metric[]{dmaapKpiMetric}; + LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); + LogPublisher[] logPublishers = new LogPublisher[1]; + logPublishers[0] = logPublisher; + PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory(). + scheduledPolicyBuilder().withPublishers(logPublishers). + withMetrics(metrics). + build(); + LOG.debug("Policy getting initialized"); + manuallyScheduledPublishingPolicy.init(); + LOG.debug("Metric initialized"); + } + + } + private Set<MRBatchingPublisher> getClients() { + Set<MRBatchingPublisher> out = new HashSet<MRBatchingPublisher>(); + for (String topic : topics) { + try { + String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props); + final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher (topicProducerPropFileName); + out.add(client); + } catch (Exception e) { + e.printStackTrace(); + } + } + + return out; + } + + @Override + public synchronized void updateCredentials(String key, String secret) { + LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); + String user = key; + String password = secret; + props.setProperty("user",String.valueOf(user)); + props.setProperty("password",String.valueOf(password)); + clients = null; + } + + @Override + public boolean post(String partition, String data) { + boolean success = true; + Properties properties=configuration.getProperties(); + if(properties!=null && properties.getProperty("metric.enabled")!=null ){ + isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled")); + } + if(isMetricEnabled){ + initMetric(); + } + + // Create clients once and reuse them on subsequent posts. This is + // to support failover to other servers in the Dmaap cluster. + if ((clients == null) || (clients.isEmpty())) { + LOG.info("Getting CambriaBatchingPublisher Clients ..."); + clients = getClients(); + } + + for (MRBatchingPublisher client : clients) { + try { + LOG.debug(String.format("Posting %s to %s", data, client)); + client.send(partition, data); + } catch (IOException e) { + e.printStackTrace(); + success = false; + } + } + if(isMetricEnabled){ + ( (DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage(); + } + return success; + } + + /** + * Close producer Dmaap client + */ + @Override + public void close() { + if ((clients == null) || (clients.isEmpty())) { + return; + } + + LOG.debug("Closing Dmaap producer clients...."); + for (MRBatchingPublisher client : clients) { + try { + client.close(1, TimeUnit.SECONDS); + } catch (IOException | InterruptedException e) { + LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client)); + e.printStackTrace(); + } + } + } + + @Override + public void useHttps(boolean yes) { + useHttps = yes; + } + + private MetricService getMetricservice() { +/* + return AppcDmaapAdapterActivator.getMetricService(); +*/ + + BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); + ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); + if (sref != null) { + LOG.info("Metric Service from bundlecontext"); + return (MetricService) bctx.getService(sref); + + } else { + LOG.info("Metric Service error from bundlecontext"); + LOG.warn("Cannot find service reference for org.openecomp.appc.metricservice.MetricService"); + return null; + + } + } + + public Metric getMetric(String name){ + return metricRegistry.metric(name); + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java new file mode 100644 index 000000000..39857b856 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java @@ -0,0 +1,83 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap.impl; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +public class DmaapUtil { + private final static String delimiter = "_"; + private static String createPreferredRouteFileIfNotExist(String topic) throws IOException { + String topicPreferredRouteFileName = null; + topicPreferredRouteFileName = topic+"preferredRoute.properties"; + File fo= new File(topicPreferredRouteFileName); + if(!fo.exists()) { + ClassLoader classLoader = DmaapUtil.class.getClassLoader(); + InputStream inputStream = classLoader.getResourceAsStream("preferredRoute.txt"); + Properties props = new Properties(); + props.load(inputStream); + String fileName = topic != null ? topic+delimiter+"MR1" : delimiter+"MR1"; + props.setProperty("preferredRouteKey", fileName); + topicPreferredRouteFileName = topic + "preferredRoute.properties"; + props.store(new FileOutputStream(topicPreferredRouteFileName), "preferredRoute.properties file created on the fly for topic:" + topic + " on:" + System.currentTimeMillis()); + } + return topicPreferredRouteFileName; + } + + public static String createConsumerPropFile(String topic, Properties props)throws IOException { + String defaultProfFileName = "consumer.properties"; + String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props); + return topicConsumerPropFileName; + } + + public static String createProducerPropFile(String topic, Properties props)throws IOException { + String defaultProfFileName = "producer.properties"; + String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props); + return topicConsumerPropFileName; + } + + private static String createConsumerProducerPropFile(String topic, String defaultProfFileName, Properties props) throws IOException { + ClassLoader classLoader = DmaapUtil.class.getClassLoader(); + InputStream inputStream = classLoader.getResourceAsStream(defaultProfFileName); + Properties defaultProps = new Properties(); + defaultProps.load(inputStream); + defaultProps.setProperty("topic",topic); + + String preferredRouteFileName = DmaapUtil.createPreferredRouteFileIfNotExist(topic); + if(props != null && !props.isEmpty()){ + defaultProps.putAll(props); + } + defaultProps.setProperty("topic",topic); + defaultProps.setProperty("DME2preferredRouterFilePath",preferredRouteFileName); + String id = defaultProps.getProperty("id"); + String topicConsumerPropFileName = defaultProfFileName; + topicConsumerPropFileName = id != null ? id+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName; + topicConsumerPropFileName = topic != null ? topic+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName; + + defaultProps.store(new FileOutputStream(topicConsumerPropFileName), defaultProfFileName+" file created on the fly for topic:"+topic+" on:"+System.currentTimeMillis()); + return topicConsumerPropFileName; + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java index 0f7d40f5b..c671c6fdb 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java @@ -19,33 +19,33 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap.impl; +package org.openecomp.appc.adapter.messaging.dmaap.impl; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.openecomp.sdnc.sli.SvcLogicContext; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import org.openecomp.appc.adapter.dmaap.EventSender; -import org.openecomp.appc.adapter.dmaap.Producer; -import org.openecomp.appc.adapter.dmaap.DmaapDestination; -import org.openecomp.appc.adapter.dmaap.event.EventHeader; -import org.openecomp.appc.adapter.dmaap.event.EventMessage; -import org.openecomp.appc.adapter.dmaap.event.EventStatus; -import org.openecomp.appc.adapter.dmaap.DmaapProducer; +import org.openecomp.appc.adapter.message.EventSender; +import org.openecomp.appc.adapter.message.MessageDestination; +import org.openecomp.appc.adapter.message.Producer; +import org.openecomp.appc.adapter.message.event.EventHeader; +import org.openecomp.appc.adapter.message.event.EventMessage; +import org.openecomp.appc.adapter.message.event.EventStatus; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; import org.openecomp.appc.configuration.Configuration; import org.openecomp.appc.configuration.ConfigurationFactory; import org.openecomp.appc.exceptions.APPCException; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import org.openecomp.sdnc.sli.SvcLogicContext; - -public class EventSenderImpl implements EventSender +public class EventSenderDmaapImpl implements EventSender { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderImpl.class); - public static final String EVENT_TOPIC_WRITE = "event.topic.write"; - public static final String EVENT_CLIENT_KEY = "event.client.key"; - public static final String EVENT_CLIENT_SECRET = "event.client.secret"; - public static final String EVENT_POOL_MEMBERS = "event.pool.members"; + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class); + public static final String EVENT_TOPIC_WRITE = "dmaap.event.topic.write"; + public static final String DMAAP_USERNAME = "dmaap.appc.username"; + public static final String DMAAP_PASSWORD = "dmaap.appc.password"; + public static final String EVENT_POOL_MEMBERS = "dmaap.event.pool.members"; private static Configuration configuration = ConfigurationFactory.getConfiguration(); @@ -59,21 +59,21 @@ public class EventSenderImpl implements EventSender this.producerMap = producerMap; } - public EventSenderImpl(){ + public EventSenderDmaapImpl(){ } public void initialize(){ Properties properties = configuration.getProperties(); String writeTopic; - String apiKey; - String apiSecret; + String username; + String password; final List<String> pool = new ArrayList<>(); - for(DmaapDestination destination:DmaapDestination.values()){ + for(MessageDestination destination: MessageDestination.values()){ writeTopic = properties.getProperty(destination + "." + EVENT_TOPIC_WRITE); - apiKey = properties.getProperty(destination + "." + EVENT_CLIENT_KEY); - apiSecret = properties.getProperty(destination + "." + EVENT_CLIENT_SECRET); + username = properties.getProperty(destination + "." + DMAAP_USERNAME); + password = properties.getProperty(destination + "." + DMAAP_PASSWORD); String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS); if (hostNames != null && !hostNames.isEmpty()) { @@ -83,12 +83,8 @@ public class EventSenderImpl implements EventSender LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE)); - LOG.debug(String.format("apiKey = %s, taken from property: %s", apiKey, destination + "." + EVENT_CLIENT_KEY)); - Producer producer = new DmaapProducer(pool, writeTopic); - - if (apiKey != null && apiSecret != null) { - producer.updateCredentials(apiKey, apiSecret); - } + LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME)); + Producer producer = new DmaapProducerImpl(pool, writeTopic,username, password); for (String url : pool) { if (url.contains("3905") || url.contains("https")) { @@ -103,7 +99,7 @@ public class EventSenderImpl implements EventSender } @Override - public boolean sendEvent(DmaapDestination destination,EventMessage msg) { + public boolean sendEvent(MessageDestination destination, EventMessage msg) { String jsonStr = msg.toJson(); String id = msg.getEventHeader().getEventId(); LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr)); @@ -112,7 +108,43 @@ public class EventSenderImpl implements EventSender } @Override - public boolean sendEvent(DmaapDestination destination,Map<String, String> params, SvcLogicContext ctx) throws APPCException { + public boolean sendEvent(MessageDestination destination, EventMessage msg, String eventTopicName) { + String jsonStr = msg.toJson(); + String id = msg.getEventHeader().getEventId(); + LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr)); + Producer producer = createProducer(destination, eventTopicName); + return producer.post(id, jsonStr); + } + + private Producer createProducer(MessageDestination destination, String eventTopicName) { + Properties properties = configuration.getProperties(); + final List<String> pool = new ArrayList<>(); + String username = properties.getProperty(destination + "." + DMAAP_USERNAME); + String password = properties.getProperty(destination + "." + DMAAP_PASSWORD); + String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS); + + if (hostNames != null && !hostNames.isEmpty()) { + LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS)); + Collections.addAll(pool, hostNames.split(",")); + } + + LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); + LOG.debug(String.format("writeTopic = %s, taken from property: %s", eventTopicName, destination + "." + EVENT_TOPIC_WRITE)); + LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME)); + Producer producer = new DmaapProducerImpl(pool, eventTopicName,username, password); + + for (String url : pool) { + if (url.contains("3905") || url.contains("https")) { + LOG.debug("Producer should use HTTPS"); + producer.useHttps(true); + break; + } + } + return producer; + } + + @Override + public boolean sendEvent(MessageDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException { if (params == null) { String message = "Parameters map is empty (null)"; @@ -134,10 +166,10 @@ public class EventSenderImpl implements EventSender LOG.error(message); throw new APPCException(message); } - EventMessage dmaapEventMessage = new EventMessage( + EventMessage eventMessage = new EventMessage( new EventHeader(eventTime, apiVer, eventId), new EventStatus(code, reason)); - return sendEvent(destination,dmaapEventMessage); + return sendEvent(destination,eventMessage); } } |