diff options
Diffstat (limited to 'src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java')
-rw-r--r-- | src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java | 927 |
1 files changed, 927 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java new file mode 100644 index 0000000..c5eb3ba --- /dev/null +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java @@ -0,0 +1,927 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package org.onap.dmaap.mr.client.impl; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPOutputStream; + +import javax.ws.rs.core.MultivaluedMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.http.HttpException; +import org.apache.http.HttpStatus; +import org.json.JSONArray; +import org.json.JSONObject; +import org.json.JSONTokener; + +import com.att.aft.dme2.api.DME2Client; +import com.att.aft.dme2.api.DME2Exception; +import org.onap.dmaap.mr.client.HostSelector; +import org.onap.dmaap.mr.client.MRBatchingPublisher; +import org.onap.dmaap.mr.client.response.MRPublisherResponse; +import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; + +public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher { + private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class); + + public static class Builder { + public Builder() { + } + + public Builder againstUrls(Collection<String> baseUrls) { + fUrls = baseUrls; + return this; + } + + public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype ) + { + fUrls = baseUrls; + fServiceName = serviceName; + fTransportype = transportype; + return this; + } + + public Builder onTopic(String topic) { + fTopic = topic; + return this; + } + + public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) { + fMaxBatchSize = maxBatchSize; + fMaxBatchAgeMs = maxBatchAgeMs; + return this; + } + + public Builder compress(boolean compress) { + fCompress = compress; + return this; + } + + public Builder httpThreadTime(int threadOccuranceTime) { + this.threadOccuranceTime = threadOccuranceTime; + return this; + } + + public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) { + fAllowSelfSignedCerts = allowSelfSignedCerts; + return this; + } + + public Builder withResponse(boolean withResponse) { + fWithResponse = withResponse; + return this; + } + + public MRSimplerBatchPublisher build() { + if (!fWithResponse) { + try { + return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, + fAllowSelfSignedCerts, threadOccuranceTime); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } else { + try { + return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, + fAllowSelfSignedCerts, fMaxBatchSize); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } + + } + + private Collection<String> fUrls; + private Collection<String> fServiceName; + private String fTransportype; + private String fTopic; + private int fMaxBatchSize = 100; + private long fMaxBatchAgeMs = 1000; + private boolean fCompress = false; + private int threadOccuranceTime = 50; + private boolean fAllowSelfSignedCerts = false; + private boolean fWithResponse = false; + + }; + + @Override + public int send(String partition, String msg) { + return send(new message(partition, msg)); + } + + @Override + public int send(String msg) { + return send(new message(null, msg)); + } + + @Override + public int send(message msg) { + final LinkedList<message> list = new LinkedList<message>(); + list.add(msg); + return send(list); + } + + @Override + public synchronized int send(Collection<message> msgs) { + if (fClosed) { + throw new IllegalStateException("The publisher was closed."); + } + + for (message userMsg : msgs) { + fPending.add(new TimestampedMessage(userMsg)); + } + return getPendingMessageCount(); + } + + @Override + public synchronized int getPendingMessageCount() { + return fPending.size(); + } + + @Override + public void close() { + try { + final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + if (remains.isEmpty()) { + getLog().warn("Closing publisher with " + remains.size() + " messages unsent. " + + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close."); + } + } catch (InterruptedException e) { + getLog().warn("Possible message loss. " + e.getMessage(), e); + Thread.currentThread().interrupt(); + } catch (IOException e) { + getLog().warn("Possible message loss. " + e.getMessage(), e); + } + } + + @Override + public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException { + synchronized (this) { + fClosed = true; + + // stop the background sender + fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + fExec.shutdown(); + } + + final long now = Clock.now(); + final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit); + final long timeoutAtMs = now + waitInMs; + + while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) { + send(true); + Thread.sleep(250); + } + + synchronized (this) { + final LinkedList<message> result = new LinkedList<message>(); + fPending.drainTo(result); + return result; + } + } + + /** + * Possibly send a batch to the MR server. This is called by the background + * thread and the close() method + * + * @param force + */ + private synchronized void send(boolean force) { + if (force || shouldSendNow()) { + if (!sendBatch()) { + getLog().warn("Send failed, " + fPending.size() + " message to send."); + + // note the time for back-off + fDontSendUntilMs = sfWaitAfterError + Clock.now(); + } + } + } + + private synchronized boolean shouldSendNow() { + boolean shouldSend = false; + if (fPending.isEmpty()) { + final long nowMs = Clock.now(); + + shouldSend = (fPending.size() >= fMaxBatchSize); + if (!shouldSend) { + final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs; + shouldSend = sendAtMs <= nowMs; + } + + // however, wait after an error + shouldSend = shouldSend && nowMs >= fDontSendUntilMs; + } + return shouldSend; + } + + /** + * Method to parse published JSON Objects and Arrays + * + * @return JSONArray + */ + private JSONArray parseJSON() { + JSONArray jsonArray = new JSONArray(); + for (TimestampedMessage m : fPending) { + JSONTokener jsonTokener = new JSONTokener(m.fMsg); + JSONObject jsonObject = null; + JSONArray tempjsonArray = null; + final char firstChar = jsonTokener.next(); + jsonTokener.back(); + if ('[' == firstChar) { + tempjsonArray = new JSONArray(jsonTokener); + if (null != tempjsonArray) { + for (int i = 0; i < tempjsonArray.length(); i++) { + jsonArray.put(tempjsonArray.getJSONObject(i)); + } + } + } else { + jsonObject = new JSONObject(jsonTokener); + jsonArray.put(jsonObject); + } + + } + return jsonArray; + } + + private synchronized boolean sendBatch() { + // it's possible for this call to be made with an empty list. in this + // case, just return. + if (fPending.size() < 1) { + return true; + } + + final long nowMs = Clock.now(); + + if (this.fHostSelector != null) { + host = this.fHostSelector.selectBaseHost(); + } + + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), + props.getProperty("partition")); + + try { + + final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); + OutputStream os = baseStream; + final String contentType = props.getProperty("contenttype"); + if (contentType.equalsIgnoreCase("application/json")) { + JSONArray jsonArray = parseJSON(); + os.write(jsonArray.toString().getBytes()); + os.close(); + + } else if (contentType.equalsIgnoreCase("text/plain")) { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); + os.write('\n'); + } + os.close(); + } else if (contentType.equalsIgnoreCase("application/cambria") + || (contentType.equalsIgnoreCase("application/cambria-zip"))) { + if (contentType.equalsIgnoreCase("application/cambria-zip")) { + os = new GZIPOutputStream(baseStream); + } + for (TimestampedMessage m : fPending) { + + os.write(("" + m.fPartition.length()).getBytes()); + os.write('.'); + os.write(("" + m.fMsg.length()).getBytes()); + os.write('.'); + os.write(m.fPartition.getBytes()); + os.write(m.fMsg.getBytes()); + os.write('\n'); + } + os.close(); + } else { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); + + } + os.close(); + } + + final long startMs = Clock.now(); + if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { + + DME2Configue(); + + Thread.sleep(5); + getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + sender.setPayload(os.toString()); + String dmeResponse = sender.sendAndWait(5000L); + + final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString(); + getLog().info(logLine); + fPending.clear(); + return true; + } + + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { + getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, + username, password, protocolFlag); + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + if (result.getInt("status") < 200 || result.getInt("status") > 299) { + return false; + } + final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); + getLog().info(logLine); + fPending.clear(); + return true; + } + + if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { + getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, + protocolFlag); + + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + if (result.getInt("status") < 200 || result.getInt("status") > 299) { + return false; + } + final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); + getLog().info(logLine); + fPending.clear(); + return true; + } + + if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { + getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType); + + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + if (result.getInt("status") < 200 || result.getInt("status") > 299) { + return false; + } + final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); + getLog().info(logLine); + fPending.clear(); + return true; + } + } catch (IllegalArgumentException x) { + getLog().warn(x.getMessage(), x); + } catch (IOException x) { + getLog().warn(x.getMessage(), x); + } catch (HttpException x) { + getLog().warn(x.getMessage(), x); + } catch (Exception x) { + getLog().warn(x.getMessage(), x); + } + return false; + } + + public synchronized MRPublisherResponse sendBatchWithResponse() { + // it's possible for this call to be made with an empty list. in this + // case, just return. + if (fPending.isEmpty()) { + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + pubResponse.setResponseMessage("No Messages to send"); + return pubResponse; + } + + final long nowMs = Clock.now(); + + host = this.fHostSelector.selectBaseHost(); + + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), + props.getProperty("partition")); + OutputStream os = null; + try { + + final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); + os = baseStream; + final String contentType = props.getProperty("contenttype"); + if (contentType.equalsIgnoreCase("application/json")) { + JSONArray jsonArray = parseJSON(); + os.write(jsonArray.toString().getBytes()); + } else if (contentType.equalsIgnoreCase("text/plain")) { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); + os.write('\n'); + } + } else if (contentType.equalsIgnoreCase("application/cambria") + || (contentType.equalsIgnoreCase("application/cambria-zip"))) { + if (contentType.equalsIgnoreCase("application/cambria-zip")) { + os = new GZIPOutputStream(baseStream); + } + for (TimestampedMessage m : fPending) { + + os.write(("" + m.fPartition.length()).getBytes()); + os.write('.'); + os.write(("" + m.fMsg.length()).getBytes()); + os.write('.'); + os.write(m.fPartition.getBytes()); + os.write(m.fMsg.getBytes()); + os.write('\n'); + } + os.close(); + } else { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); + + } + } + + final long startMs = Clock.now(); + if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { + + try { + DME2Configue(); + + Thread.sleep(5); + getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + sender.setPayload(os.toString()); + + String dmeResponse = sender.sendAndWait(5000L); + + pubResponse = createMRPublisherResponse(dmeResponse, pubResponse); + + if (Integer.valueOf(pubResponse.getResponseCode()) < 200 + || Integer.valueOf(pubResponse.getResponseCode()) > 299) { + + return pubResponse; + } + final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString(); + getLog().info(logLine); + fPending.clear(); + + } catch (DME2Exception x) { + getLog().warn(x.getMessage(), x); + pubResponse.setResponseCode(x.getErrorCode()); + pubResponse.setResponseMessage(x.getErrorMessage()); + } catch (URISyntaxException x) { + + getLog().warn(x.getMessage(), x); + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + pubResponse.setResponseMessage(x.getMessage()); + } catch (Exception x) { + + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + pubResponse.setResponseMessage(x.getMessage()); + logger.error("exception: ", x); + + } + + return pubResponse; + } + + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { + getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, + authDate, username, password, protocolFlag); + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + + pubResponse = createMRPublisherResponse(result, pubResponse); + + if (Integer.valueOf(pubResponse.getResponseCode()) < 200 + || Integer.valueOf(pubResponse.getResponseCode()) > 299) { + + return pubResponse; + } + + final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); + getLog().info(logLine); + fPending.clear(); + return pubResponse; + } + + if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { + getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, + password, protocolFlag); + + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + pubResponse = createMRPublisherResponse(result, pubResponse); + + if (Integer.valueOf(pubResponse.getResponseCode()) < 200 + || Integer.valueOf(pubResponse.getResponseCode()) > 299) { + + return pubResponse; + } + + final String logLine = String.valueOf((Clock.now() - startMs)); + getLog().info(logLine); + fPending.clear(); + return pubResponse; + } + + if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { + getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType); + + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + pubResponse = createMRPublisherResponse(result, pubResponse); + + if (Integer.valueOf(pubResponse.getResponseCode()) < 200 + || Integer.valueOf(pubResponse.getResponseCode()) > 299) { + + return pubResponse; + } + + final String logLine = String.valueOf((Clock.now() - startMs)); + getLog().info(logLine); + fPending.clear(); + return pubResponse; + } + } catch (IllegalArgumentException x) { + getLog().warn(x.getMessage(), x); + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + pubResponse.setResponseMessage(x.getMessage()); + + } catch (IOException x) { + getLog().warn(x.getMessage(), x); + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + pubResponse.setResponseMessage(x.getMessage()); + + } catch (HttpException x) { + getLog().warn(x.getMessage(), x); + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + pubResponse.setResponseMessage(x.getMessage()); + + } catch (Exception x) { + getLog().warn(x.getMessage(), x); + + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + pubResponse.setResponseMessage(x.getMessage()); + + } + + finally { + if (fPending.size() > 0) { + getLog().warn("Send failed, " + fPending.size() + " message to send."); + pubResponse.setPendingMsgs(fPending.size()); + } + if (os != null) { + try { + os.close(); + } catch (Exception x) { + getLog().warn(x.getMessage(), x); + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + pubResponse.setResponseMessage("Error in closing Output Stream"); + } + } + } + + return pubResponse; + } + + public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) { + + if (reply.isEmpty()) { + + mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + mrPubResponse.setResponseMessage("Please verify the Producer properties"); + } else if (reply.startsWith("{")) { + JSONObject jObject = new JSONObject(reply); + if (jObject.has("message") && jObject.has("status")) { + String message = jObject.getString("message"); + if (null != message) { + mrPubResponse.setResponseMessage(message); + } + mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status"))); + } else { + mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); + mrPubResponse.setResponseMessage(reply); + } + } else if (reply.startsWith("<")) { + String responseCode = getHTTPErrorResponseCode(reply); + if (responseCode.contains("403")) { + responseCode = "403"; + } + mrPubResponse.setResponseCode(responseCode); + mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); + } + + return mrPubResponse; + } + + private final String fTopic; + private final int fMaxBatchSize; + private final long fMaxBatchAgeMs; + private final boolean fCompress; + private int threadOccuranceTime; + private boolean fClosed; + private String username; + private String password; + private String host; + + // host selector + private HostSelector fHostSelector = null; + + private final LinkedBlockingQueue<TimestampedMessage> fPending; + private long fDontSendUntilMs; + private final ScheduledThreadPoolExecutor fExec; + + private String latitude; + private String longitude; + private String version; + private String serviceName; + private String env; + private String partner; + private String routeOffer; + private String subContextPath; + private String protocol; + private String methodType; + private String url; + private String dmeuser; + private String dmepassword; + private String contentType; + private static final long sfWaitAfterError = 10000; + private HashMap<String, String> DMETimeOuts; + private DME2Client sender; + public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); + private String authKey; + private String authDate; + private String handlers; + private Properties props; + public static String routerFilePath; + protected static final Map<String, String> headers = new HashMap<String, String>(); + public static MultivaluedMap<String, Object> headersMap; + + private MRPublisherResponse pubResponse; + + public MRPublisherResponse getPubResponse() { + return pubResponse; + } + + public void setPubResponse(MRPublisherResponse pubResponse) { + this.pubResponse = pubResponse; + } + + public static String getRouterFilePath() { + return routerFilePath; + } + + public static void setRouterFilePath(String routerFilePath) { + MRSimplerBatchPublisher.routerFilePath = routerFilePath; + } + + public Properties getProps() { + return props; + } + + public void setProps(Properties props) { + this.props = props; + } + + public String getProtocolFlag() { + return protocolFlag; + } + + public void setProtocolFlag(String protocolFlag) { + this.protocolFlag = protocolFlag; + } + + private void DME2Configue() throws Exception { + try { + + latitude = props.getProperty("Latitude"); + longitude = props.getProperty("Longitude"); + version = props.getProperty("Version"); + serviceName = props.getProperty("ServiceName"); + env = props.getProperty("Environment"); + partner = props.getProperty("Partner"); + routeOffer = props.getProperty("routeOffer"); + subContextPath = props.getProperty("SubContextPath") + fTopic; + + protocol = props.getProperty("Protocol"); + methodType = props.getProperty("MethodType"); + dmeuser = props.getProperty("username"); + dmepassword = props.getProperty("password"); + contentType = props.getProperty("contenttype"); + handlers = props.getProperty("sessionstickinessrequired"); + routerFilePath = props.getProperty("DME2preferredRouterFilePath"); + + /** + * Changes to DME2Client url to use Partner for auto failover + * between data centers When Partner value is not provided use the + * routeOffer value for auto failover within a cluster + */ + + String partitionKey = props.getProperty("partition"); + + if (partner != null && !partner.isEmpty()) { + url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + + partner; + if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) { + url = url + "&partitionKey=" + partitionKey; + } + } else if (routeOffer != null && !routeOffer.isEmpty()) { + url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer=" + + routeOffer; + if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) { + url = url + "&partitionKey=" + partitionKey; + } + } + + DMETimeOuts = new HashMap<String, String>(); + DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); + DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS")); + DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); + DMETimeOuts.put("Content-Type", contentType); + System.setProperty("AFT_LATITUDE", latitude); + System.setProperty("AFT_LONGITUDE", longitude); + System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT")); + // System.setProperty("DME2.DEBUG", "true"); + + // SSL changes + // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", + + System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2"); + System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false"); + System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); + + // SSL changes + + sender = new DME2Client(new URI(url), 5000L); + + sender.setAllowAllHttpReturnCodes(true); + sender.setMethod(methodType); + sender.setSubContext(subContextPath); + sender.setCredentials(dmeuser, dmepassword); + sender.setHeaders(DMETimeOuts); + if (handlers != null &&handlers.equalsIgnoreCase("yes")) { + sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", + props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", + props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); + sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON")); + } else { + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler"); + } + } catch (DME2Exception x) { + getLog().warn(x.getMessage(), x); + throw new DME2Exception(x.getErrorCode(), x.getErrorMessage()); + } catch (URISyntaxException x) { + + getLog().warn(x.getMessage(), x); + throw new URISyntaxException(url, x.getMessage()); + } catch (Exception x) { + + getLog().warn(x.getMessage(), x); + throw new IllegalArgumentException(x.getMessage()); + } + } + + private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, + boolean compress) throws MalformedURLException { + super(hosts); + + if (topic == null || topic.length() < 1) { + throw new IllegalArgumentException("A topic must be provided."); + } + + fHostSelector = new HostSelector(hosts, null); + fClosed = false; + fTopic = topic; + fMaxBatchSize = maxBatchSize; + fMaxBatchAgeMs = maxBatchAgeMs; + fCompress = compress; + + fPending = new LinkedBlockingQueue<TimestampedMessage>(); + fDontSendUntilMs = 0; + fExec = new ScheduledThreadPoolExecutor(1); + pubResponse = new MRPublisherResponse(); + + } + + private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, + boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException { + super(hosts); + + if (topic == null || topic.length() < 1) { + throw new IllegalArgumentException("A topic must be provided."); + } + + fHostSelector = new HostSelector(hosts, null); + fClosed = false; + fTopic = topic; + fMaxBatchSize = maxBatchSize; + fMaxBatchAgeMs = maxBatchAgeMs; + fCompress = compress; + threadOccuranceTime = httpThreadOccurnace; + fPending = new LinkedBlockingQueue<TimestampedMessage>(); + fDontSendUntilMs = 0; + fExec = new ScheduledThreadPoolExecutor(1); + fExec.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + send(false); + } + }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS); + pubResponse = new MRPublisherResponse(); + } + + private static class TimestampedMessage extends message { + public TimestampedMessage(message m) { + super(m); + timestamp = Clock.now(); + } + + public final long timestamp; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getContentType() { + return contentType; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + public String getAuthKey() { + return authKey; + } + + public void setAuthKey(String authKey) { + this.authKey = authKey; + } + + public String getAuthDate() { + return authDate; + } + + public void setAuthDate(String authDate) { + this.authDate = authDate; + } + +} |