aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/att/nsa/mr/client/MRClientFactory.java19
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java46
-rw-r--r--src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java16
-rw-r--r--src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java176
4 files changed, 245 insertions, 12 deletions
diff --git a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
index 32b44e4..f237776 100644
--- a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
+++ b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
@@ -39,6 +39,7 @@ import com.att.nsa.mr.client.impl.MRConsumerImpl;
import com.att.nsa.mr.client.impl.MRMetaClient;
import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
+import com.att.nsa.mr.tools.ValidatorUtil;
/**
* A factory for MR clients.<br/>
@@ -483,7 +484,7 @@ public class MRClientFactory {
MRSimplerBatchPublisher pub;
if (withResponse) {
pub = new MRSimplerBatchPublisher.Builder()
- .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
+ .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType"))
.onTopic(props.getProperty("topic"))
.batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
Integer.parseInt(props.getProperty("maxAgeMs").toString()))
@@ -492,7 +493,7 @@ public class MRClientFactory {
.withResponse(withResponse).build();
} else {
pub = new MRSimplerBatchPublisher.Builder()
- .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
+ .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType"))
.onTopic(props.getProperty("topic"))
.batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
Integer.parseInt(props.getProperty("maxAgeMs").toString()))
@@ -512,13 +513,16 @@ public class MRClientFactory {
}
pub.setProtocolFlag(props.getProperty("TransportType"));
pub.setProps(props);
- routeFilePath = props.getProperty("DME2preferredRouterFilePath");
- routeReader = new FileReader(new File(routeFilePath));
prop = new Properties();
- File fo = new File(routeFilePath);
- if (!fo.exists()) {
- routeWriter = new FileWriter(new File(routeFilePath));
+ if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
+ routeFilePath = props.getProperty("DME2preferredRouterFilePath");
+ routeReader = new FileReader(new File(routeFilePath));
+ File fo = new File(routeFilePath);
+ if (!fo.exists()) {
+ routeWriter = new FileWriter(new File(routeFilePath));
+ }
}
+ // pub.setContentType(contentType);
return pub;
}
@@ -623,6 +627,7 @@ public class MRClientFactory {
public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
int timeout;
+ ValidatorUtil.validateSubscriber(props);
if (props.getProperty("timeout") != null)
timeout = Integer.parseInt(props.getProperty("timeout"));
else
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
index b54fedb..faa81ce 100644
--- a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
+++ b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
@@ -127,6 +127,8 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
try {
// getLog().info ( "Receiving msgs from: " +
// url+subContextPath );
+ long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs : (timeoutMs+ defaultDme2ReplyHandlerTimeoutMs);
+ //String reply = sender.sendAndWait(timeout);
String reply = sender.sendAndWait(timeoutMs + 10000L);
final JSONObject o = getResponseDataInJson(reply);
// msgs.add(reply);
@@ -362,6 +364,11 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
private HashMap<String, String> DMETimeOuts;
private String handlers;
public static final String routerFilePath = null;
+ private long dme2ReplyHandlerTimeoutMs;
+ private long longPollingMs;
+ private static final long defaultDme2PerEndPointTimeoutMs = 10000L;
+ private static final long defaultDme2ReplyHandlerTimeoutMs = 10000L;
+
public static String getRouterFilePath() {
return routerFilePath;
@@ -388,6 +395,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
}
private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
+ this.longPollingMs = timeoutMs;
latitude = props.getProperty("Latitude");
longitude = props.getProperty("Longitude");
version = props.getProperty("Version");
@@ -459,7 +467,38 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
// SSL changes
- sender = new DME2Client(new URI(url), timeoutMs + 10000L);
+ long dme2PerEndPointTimeoutMs;
+ try {
+ dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS"));
+ //backward compatibility
+ if ( dme2PerEndPointTimeoutMs <= 0) {
+ dme2PerEndPointTimeoutMs = timeoutMs + defaultDme2PerEndPointTimeoutMs;
+ }
+ } catch (NumberFormatException nfe) {
+ //backward compatibility
+ dme2PerEndPointTimeoutMs = timeoutMs + defaultDme2PerEndPointTimeoutMs;
+ getLog().debug("DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + defaultDme2PerEndPointTimeoutMs);
+ }
+
+ try {
+ dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS"));
+ } catch (NumberFormatException nfe) {
+ try {
+ long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+ long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+ dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
+ getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT " + dme2ReplyHandlerTimeoutMs);
+ }catch (NumberFormatException e) {
+ //backward compatibility
+ dme2ReplyHandlerTimeoutMs = timeoutMs + defaultDme2ReplyHandlerTimeoutMs;
+ getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
+ }
+ }
+ //backward compatibility
+ if ( dme2ReplyHandlerTimeoutMs <= 0) {
+ dme2ReplyHandlerTimeoutMs = timeoutMs + defaultDme2ReplyHandlerTimeoutMs;
+ }
+ sender = new DME2Client(new URI(url),dme2PerEndPointTimeoutMs);
sender.setAllowAllHttpReturnCodes(true);
sender.setMethod(methodType);
sender.setSubContext(subContextPath);
@@ -469,7 +508,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
sender.setHeaders(DMETimeOuts);
sender.setPayload("");
- if (handlers.equalsIgnoreCase("yes")) {
+ if (handlers!=null&&handlers.equalsIgnoreCase("yes")) {
sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
@@ -594,7 +633,8 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
DMEConfigure(timeoutMs, limit);
- String reply = sender.sendAndWait(timeoutMs + 10000L);
+ long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs : (timeoutMs+ defaultDme2ReplyHandlerTimeoutMs);
+ String reply = sender.sendAndWait(timeout);
final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
index 2f7680b..5bb5087 100644
--- a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
+++ b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
@@ -67,6 +67,14 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
fUrls = baseUrls;
return this;
}
+
+ public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )
+ {
+ fUrls = baseUrls;
+ fServiceName = serviceName;
+ fTransportype = transportype;
+ return this;
+ }
public Builder onTopic(String topic) {
fTopic = topic;
@@ -119,6 +127,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
}
private Collection<String> fUrls;
+ private Collection<String> fServiceName;
+ private String fTransportype;
private String fTopic;
private int fMaxBatchSize = 100;
private long fMaxBatchAgeMs = 1000;
@@ -278,7 +288,9 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
final long nowMs = Clock.now();
- host = this.fHostSelector.selectBaseHost();
+ if (this.fHostSelector != null) {
+ host = this.fHostSelector.selectBaseHost();
+ }
final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
props.getProperty("partition"));
@@ -796,7 +808,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
sender.setSubContext(subContextPath);
sender.setCredentials(dmeuser, dmepassword);
sender.setHeaders(DMETimeOuts);
- if (handlers.equalsIgnoreCase("yes")) {
+ if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
diff --git a/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java b/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java
new file mode 100644
index 0000000..900c932
--- /dev/null
+++ b/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java
@@ -0,0 +1,176 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.mr.tools;
+
+import java.util.Properties;
+
+import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
+
+public class ValidatorUtil {
+
+ public static void validatePublisher(Properties props) {
+ String transportType = props.getProperty("TransportType");
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(transportType)) {
+ validateForDME2(props);
+ } else {
+ validateForNonDME2(props);
+ }
+ String maxBatchSize = props.getProperty("maxBatchSize");
+ if (maxBatchSize == null || maxBatchSize.isEmpty()) {
+ throw new IllegalArgumentException ( "maxBatchSize is needed" );
+ }
+ String maxAgeMs = props.getProperty("maxAgeMs");
+ if (maxAgeMs == null || maxAgeMs.isEmpty()) {
+ throw new IllegalArgumentException ( "maxAgeMs is needed" );
+ }
+ String messageSentThreadOccurance = props.getProperty("MessageSentThreadOccurance");
+ if (messageSentThreadOccurance == null || messageSentThreadOccurance.isEmpty()) {
+ throw new IllegalArgumentException ( "MessageSentThreadOccurance is needed" );
+ }
+
+ }
+
+ public static void validateSubscriber(Properties props) {
+ String transportType = props.getProperty("TransportType");
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(transportType)) {
+ validateForDME2(props);
+ } else {
+ validateForNonDME2(props);
+ }
+ String group = props.getProperty("group");
+ if (group == null || group.isEmpty()) {
+ throw new IllegalArgumentException ( "group is needed" );
+ }
+ String id = props.getProperty("id");
+ if (id == null || id.isEmpty()) {
+ throw new IllegalArgumentException ( "Consumer (Id) is needed" );
+ }
+ String sessionstickinessrequired = props.getProperty("sessionstickinessrequired");
+ if (sessionstickinessrequired == null || sessionstickinessrequired.isEmpty()) {
+ throw new IllegalArgumentException ( "sessionstickinessrequired is needed" );
+ }
+ }
+
+ private static void validateForDME2(Properties props) {
+ String serviceName = props.getProperty("ServiceName");
+ if (serviceName == null || serviceName.isEmpty()) {
+ throw new IllegalArgumentException ( "Servicename is needed" );
+ }
+ String topic = props.getProperty("topic");
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException ( "topic is needed" );
+ }
+ String username = props.getProperty("username");
+ if (username == null || username.isEmpty()) {
+ throw new IllegalArgumentException ( "username is needed" );
+ }
+ String password = props.getProperty("password");
+ if (password == null || password.isEmpty()) {
+ throw new IllegalArgumentException ( "password is needed" );
+ }
+ String dME2preferredRouterFilePath = props.getProperty("DME2preferredRouterFilePath");
+ if (dME2preferredRouterFilePath == null || dME2preferredRouterFilePath.isEmpty()) {
+ throw new IllegalArgumentException ( "DME2preferredRouterFilePath is needed" );
+ }
+ String partner = props.getProperty("Partner");
+ String routeOffer = props.getProperty("routeOffer");
+ if ((partner == null || partner.isEmpty()) && (routeOffer == null || routeOffer.isEmpty())) {
+ throw new IllegalArgumentException ( "Partner or routeOffer is needed" );
+ }
+ String protocol = props.getProperty("Protocol");
+ if (protocol == null || protocol.isEmpty()) {
+ throw new IllegalArgumentException ( "Protocol is needed" );
+ }
+ String methodType = props.getProperty("MethodType");
+ if (methodType == null || methodType.isEmpty()) {
+ throw new IllegalArgumentException ( "MethodType is needed" );
+ }
+ String contenttype = props.getProperty("contenttype");
+ if (contenttype == null || contenttype.isEmpty()) {
+ throw new IllegalArgumentException ( "contenttype is needed" );
+ }
+ String latitude = props.getProperty("Latitude");
+ if (latitude == null || latitude.isEmpty()) {
+ throw new IllegalArgumentException ( "Latitude is needed" );
+ }
+ String longitude = props.getProperty("Longitude");
+ if (longitude == null || longitude.isEmpty()) {
+ throw new IllegalArgumentException ( "Longitude is needed" );
+ }
+ String aftEnv = props.getProperty("AFT_ENVIRONMENT");
+ if (aftEnv == null || aftEnv.isEmpty()) {
+ throw new IllegalArgumentException ( "AFT_ENVIRONMENT is needed" );
+ }
+ String version = props.getProperty("Version");
+ if (version == null || version.isEmpty()) {
+ throw new IllegalArgumentException ( "Version is needed" );
+ }
+ String environment = props.getProperty("Environment");
+ if (environment == null || environment.isEmpty()) {
+ throw new IllegalArgumentException ( "Environment is needed" );
+ }
+ String subContextPath = props.getProperty("SubContextPath");
+ if (subContextPath == null || subContextPath.isEmpty()) {
+ throw new IllegalArgumentException ( "SubContextPath is needed" );
+ }
+ }
+
+ private static void validateForNonDME2(Properties props) {
+ String transportType = props.getProperty("TransportType");
+ String host = props.getProperty("host");
+ if (host == null || host.isEmpty()) {
+ throw new IllegalArgumentException ( "Servicename is needed" );
+ }
+ String topic = props.getProperty("topic");
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException ( "topic is needed" );
+ }
+ String methodType = props.getProperty("MethodType");
+ if (methodType == null || methodType.isEmpty()) {
+ throw new IllegalArgumentException ( "MethodType is needed" );
+ }
+ String contenttype = props.getProperty("contenttype");
+ if (contenttype == null || contenttype.isEmpty()) {
+ throw new IllegalArgumentException ( "contenttype is needed" );
+ }
+ String username = props.getProperty("username");
+ if (username == null || username.isEmpty()) {
+ throw new IllegalArgumentException ( "username is needed" );
+ }
+ String password = props.getProperty("password");
+ if (password == null || password.isEmpty()) {
+ throw new IllegalArgumentException ( "password is needed" );
+ }
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(transportType)) {
+ String authKey = props.getProperty("authKey");
+ if (authKey == null || authKey.isEmpty()) {
+ throw new IllegalArgumentException ( "authKey is needed" );
+ }
+ String authDate = props.getProperty("authDate");
+ if (authDate == null || authDate.isEmpty()) {
+ throw new IllegalArgumentException ( "password is needed" );
+ }
+
+ }
+ }
+
+}