aboutsummaryrefslogtreecommitdiffstats
path: root/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils
diff options
context:
space:
mode:
Diffstat (limited to 'dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils')
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java295
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java112
-rw-r--r--dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java118
3 files changed, 525 insertions, 0 deletions
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java
new file mode 100644
index 0000000..af191c5
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java
@@ -0,0 +1,295 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.data.schema.Schema;
+import co.cask.cdap.etl.api.PipelineConfigurer;
+import com.google.common.base.Function;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * @author Rajiv Singla . Creation Date: 1/26/2017.
+ */
+public abstract class CDAPPluginUtils extends AnalyticsModelJsonUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CDAPPluginUtils.class);
+
+ public static final Function<Schema, Schema.Type> SCHEMA_TO_TYPE_FUNCTION = new Function<Schema, Schema.Type>() {
+ @Override
+ public Schema.Type apply(@Nonnull Schema schema) {
+ return schema.getType();
+ }
+ };
+
+
+
+ private CDAPPluginUtils() {
+ // private constructor
+ }
+
+ /**
+ * Validates if CDAP Schema contains expected fields
+ *
+ * @param schema schema that need to be validated
+ * @param expectedFields fields that are expected to be in the schema
+ */
+
+ public static void validateSchemaContainsFields(@Nullable final Schema schema, final String... expectedFields) {
+
+ LOG.debug("Validating schema:{} contains expected fields:{}", schema, Arrays.toString(expectedFields));
+
+ if (schema == null) {
+ // If input schema is null then no validation possible
+ LOG.warn("Input Schema is null. No validation possible");
+ } else {
+ // Check if expected fields are indeed present in the schema
+ for (String expectedField : expectedFields) {
+ final Schema.Field schemaField = schema.getField(expectedField);
+ if (schemaField == null) {
+ final String errorMessage = String.format(
+ "Unable to find expected field: %s, in schema: %s", expectedField, schema);
+ throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+ }
+ LOG.debug("Successfully validated schema:{}, contains expected fields:{}", schema,
+ Arrays.toString(expectedFields));
+ }
+ }
+
+
+ /**
+ * Creates a new Structured Record containing DMaaP MR fetched message
+ *
+ * @param message DMaaP MR fetch message
+ *
+ * @return Structured record containing DMaaP MR Message
+ */
+ public static StructuredRecord createDMaaPMRResponseStructuredRecord(final String message) {
+ StructuredRecord.Builder recordBuilder = StructuredRecord.builder(DMaaPSourceOutputSchema.getSchema());
+ recordBuilder
+ .set(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), System.nanoTime())
+ .set(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), 200)
+ .set(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), "OK")
+ .set(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), message);
+ return recordBuilder.build();
+ }
+
+
+ /**
+ * Creates output {@link StructuredRecord.Builder} which has copied values from input {@link StructuredRecord}
+ *
+ * @param outputSchema output Schema
+ * @param inputStructuredRecord input Structured Record
+ *
+ * @return output Structured Record builder with pre populated values from input structured record
+ */
+ public static StructuredRecord.Builder createOutputStructuredRecordBuilder(
+ @Nonnull final Schema outputSchema,
+ @Nonnull final StructuredRecord inputStructuredRecord) {
+
+ // Get input structured Record Schema
+ final Schema inputSchema = inputStructuredRecord.getSchema();
+ // Create new instance of output Structured Record Builder from output Schema
+ final StructuredRecord.Builder outputStructuredRecordBuilder = StructuredRecord.builder(outputSchema);
+
+ // iterate over input fields and if output schema has field with same name copy the value to out record builder
+ for (Schema.Field inputField : inputSchema.getFields()) {
+ final String inputFieldName = inputField.getName();
+ if (outputSchema.getField(inputFieldName) != null) {
+ outputStructuredRecordBuilder.set(inputFieldName, inputStructuredRecord.get(inputFieldName));
+ }
+ }
+
+ return outputStructuredRecordBuilder;
+ }
+
+
+ /**
+ * Adds Field value to {@link StructuredRecord.Builder} if schema contains that field Name
+ *
+ * @param structuredRecordBuilder structured record builder
+ * @param structuredRecordSchema schema for structured record builder
+ * @param fieldName field name
+ * @param fieldValue field value
+ *
+ * @return structured record builder with populated field name and value if schema contains field name
+ */
+ public static StructuredRecord.Builder addFieldValueToStructuredRecordBuilder(
+ @Nonnull final StructuredRecord.Builder structuredRecordBuilder,
+ @Nonnull final Schema structuredRecordSchema,
+ @Nonnull final String fieldName,
+ final Object fieldValue) {
+
+ // check if schema contains field Name
+ if (structuredRecordSchema.getField(fieldName) != null) {
+ structuredRecordBuilder.set(fieldName, fieldValue);
+ } else {
+ LOG.info("Unable to populate value for field Name: {} with field value: {}. " +
+ "Schema Fields: {} does not contain field name: {}",
+ fieldName, fieldValue, structuredRecordSchema.getFields(), fieldName);
+ }
+
+ return structuredRecordBuilder;
+ }
+
+
+ /**
+ * Validates that given schema String has fieldName of expected type. If field does not exist in given schema
+ * then validation will pass with warning. If field does exist in given schema then this validation will return
+ * true if field type is same as expected type else false
+ *
+ * @param schemaString CDAP Plugin output or input schema string
+ * @param fieldName field name
+ * @param expectedFieldType expected schema field type
+ *
+ * @return true if field type matches expected field type else false. If field does not exist in
+ * give schema validation will pass but will generate a warning message
+ */
+ public static boolean validateSchemaFieldType(@Nonnull final String schemaString,
+ @Nonnull final String fieldName,
+ @Nonnull final Schema.Type expectedFieldType) {
+
+ try {
+ // parse given schema String
+ final Schema outputSchema = Schema.parseJson(schemaString);
+ final Schema.Field schemaField = outputSchema.getField(fieldName);
+
+ // if given schema does contain field then validated fieldName type
+ if (schemaField != null) {
+
+ final List<Schema> schemas = new LinkedList<>();
+
+ // if it is a union type then grab all union schemas
+ if (outputSchema.getField(fieldName).getSchema().getType() == Schema.Type.UNION) {
+ final List<Schema> unionFieldSchemas =
+ outputSchema.getField(fieldName).getSchema().getUnionSchemas();
+ schemas.addAll(unionFieldSchemas);
+ } else {
+ // if not union type the just get the field schema
+ final Schema fieldSchema = outputSchema.getField(fieldName).getSchema();
+ schemas.add(fieldSchema);
+ }
+
+ // get all schema types
+ final List<Schema.Type> fieldTypes =
+ Lists.transform(schemas, CDAPPluginUtils.SCHEMA_TO_TYPE_FUNCTION);
+
+ // if all schema types does not contain expected field type then return false
+ if (!fieldTypes.contains(expectedFieldType)) {
+ LOG.error("Validation failed for fieldName: {} is NOT of expected Type: {} in schema: {}",
+ fieldName, expectedFieldType, outputSchema);
+ return false;
+ }
+
+ // field type validation passed
+ LOG.debug("Successfully validated fieldName: {} is of expected Type: {}",
+ fieldName, expectedFieldType);
+
+ return true;
+
+ } else {
+
+ // if field does not exist then the validation will pass but will generate warning message
+ LOG.warn("Validation of field type not possible. Field name: {} does not exist in schema: {}",
+ fieldName, outputSchema);
+ return true;
+ }
+
+ } catch (IOException e) {
+ final String errorMessage =
+ String.format("Unable to parse schema: %s for field type validation. " +
+ "Field Name: %s, Expected Field Type: %s Exception: %s",
+ schemaString, fieldName, expectedFieldType, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+ }
+
+
+ /**
+ * Parses provided schema String as Schema object and set it as output Schema format
+ *
+ * @param pipelineConfigurer plugin pipeline configurer
+ * @param schemaString schema String to be set as output schema
+ */
+ public static void setOutputSchema(final PipelineConfigurer pipelineConfigurer, final String schemaString) {
+ try {
+ final Schema outputSchema = Schema.parseJson(schemaString);
+ pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
+ } catch (IOException e) {
+ final String errorMessage = String.format(
+ "Schema specified is not a valid JSON. Schema String: %s, Exception: %s", schemaString, e);
+ throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+ }
+
+
+ /**
+ * Parses incoming plugin config mapping to key value map. If any of the key value map is blank an Illegal Argument
+ * exception will be thrown
+ *
+ * @param mappingFieldString field Mapping String
+ *
+ * @return map containing mapping key values
+ */
+ public static Map<String, String> extractFieldMappings(final String mappingFieldString) {
+ final Map<String, String> fieldMappings = Maps.newHashMap();
+ if (StringUtils.isNotBlank(mappingFieldString)) {
+ final Splitter commaSplitter = Splitter.on(",");
+ for (String fieldMapping : commaSplitter.split(mappingFieldString)) {
+ final String[] keyValueMappings = fieldMapping.split(":");
+ if (keyValueMappings.length != 2 ||
+ StringUtils.isBlank(keyValueMappings[0]) ||
+ StringUtils.isBlank(keyValueMappings[1])) {
+ final String errorMessage = "Field Mapping key or value is Blank. All field mappings must " +
+ "be present in mappings: " + mappingFieldString;
+ throw new DCAEAnalyticsRuntimeException(
+ errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+ fieldMappings.put(keyValueMappings[0].trim(), keyValueMappings[1].trim());
+ }
+ }
+ return fieldMappings;
+ }
+
+
+
+
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java
new file mode 100644
index 0000000..ebe7d49
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSinkConfigMapper.java
@@ -0,0 +1,112 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;
+
+import com.google.common.base.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPPluginConstants.DMaaPMRSinkHadoopConfigFields;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+
+import javax.annotation.Nonnull;
+
+import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty;
+import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent;
+
+/**
+ * Function that converts {@link Configuration} to {@link DMaaPMRPublisherConfig}
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/26/2017.
+ */
+public class DMaaPSinkConfigMapper implements Function<Configuration, DMaaPMRPublisherConfig> {
+
+ /**
+ * Static method to map {@link Configuration} to {@link DMaaPMRPublisherConfig}
+ *
+ * @param sinkPluginConfig DMaaP Sink Plugin Config
+ *
+ * @return DMaaP MR Publisher Config
+ */
+ public static DMaaPMRPublisherConfig map(final Configuration sinkPluginConfig) {
+ return new DMaaPSinkConfigMapper().apply(sinkPluginConfig);
+ }
+
+ /**
+ * Converts {@link Configuration} to {@link DMaaPMRPublisherConfig}
+ *
+ * @param configuration Hadoop Configuration containing DMaaP MR Sink field values
+ *
+ * @return DMaaP MR Publisher Config
+ */
+ @Nonnull
+ @Override
+ public DMaaPMRPublisherConfig apply(@Nonnull Configuration configuration) {
+
+ // Create a new publisher settings builder
+ final String hostName = configuration.get(DMaaPMRSinkHadoopConfigFields.HOST_NAME);
+ final String topicName = configuration.get(DMaaPMRSinkHadoopConfigFields.TOPIC_NAME);
+
+ if (isEmpty(hostName) || isEmpty(topicName)) {
+ throw new IllegalStateException("DMaaP MR Sink Host Name and Topic Name must be present");
+ }
+
+ final DMaaPMRPublisherConfig.Builder publisherConfigBuilder =
+ new DMaaPMRPublisherConfig.Builder(hostName, topicName);
+
+ // Setup up any optional publisher parameters if they are present
+ final String portNumber = configuration.get(DMaaPMRSinkHadoopConfigFields.PORT_NUMBER);
+ if (portNumber != null) {
+ publisherConfigBuilder.setPortNumber(Integer.parseInt(portNumber));
+ }
+
+ final String protocol = configuration.get(DMaaPMRSinkHadoopConfigFields.PROTOCOL);
+ if (isPresent(protocol)) {
+ publisherConfigBuilder.setProtocol(protocol);
+ }
+
+ final String userName = configuration.get(DMaaPMRSinkHadoopConfigFields.USER_NAME);
+ if (isPresent(userName)) {
+ publisherConfigBuilder.setUserName(userName);
+ }
+
+ final String userPassword = configuration.get(DMaaPMRSinkHadoopConfigFields.USER_PASS);
+ if (isPresent(userPassword)) {
+ publisherConfigBuilder.setUserPassword(userPassword);
+ }
+
+ final String contentType = configuration.get(DMaaPMRSinkHadoopConfigFields.CONTENT_TYPE);
+ if (isPresent(contentType)) {
+ publisherConfigBuilder.setContentType(contentType);
+ }
+
+ final String maxBatchSize = configuration.get(DMaaPMRSinkHadoopConfigFields.MAX_BATCH_SIZE);
+ if (maxBatchSize != null) {
+ publisherConfigBuilder.setMaxBatchSize(Integer.parseInt(maxBatchSize));
+ }
+
+ final String maxRecoveryQueueSize = configuration.get(DMaaPMRSinkHadoopConfigFields.MAX_RECOVER_QUEUE_SIZE);
+ if (maxRecoveryQueueSize != null) {
+ publisherConfigBuilder.setMaxRecoveryQueueSize(Integer.parseInt(maxRecoveryQueueSize));
+ }
+
+ return publisherConfigBuilder.build();
+
+ }
+}
diff --git a/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java
new file mode 100644
index 0000000..8717632
--- /dev/null
+++ b/dcae-analytics-cdap-plugins/src/main/java/org/openecomp/dcae/apod/analytics/cdap/plugins/utils/DMaaPSourceConfigMapper.java
@@ -0,0 +1,118 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.cdap.plugins.utils;
+
+import com.google.common.base.Function;
+import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
+
+import javax.annotation.Nonnull;
+
+import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isEmpty;
+import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.isPresent;
+
+/**
+ * Function that converts {@link DMaaPMRSourcePluginConfig} to {@link DMaaPMRSubscriberConfig}
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/18/2017.
+ */
+public class DMaaPSourceConfigMapper implements Function<DMaaPMRSourcePluginConfig, DMaaPMRSubscriberConfig> {
+
+ /**
+ * Static factory method to map {@link DMaaPMRSourcePluginConfig} to {@link DMaaPMRSubscriberConfig}
+ *
+ * @param pluginConfig DMaaP MR Souce Plugin Config
+ *
+ * @return DMaaP MR Subscriber Config
+ */
+ public static DMaaPMRSubscriberConfig map(final DMaaPMRSourcePluginConfig pluginConfig) {
+ return new DMaaPSourceConfigMapper().apply(pluginConfig);
+ }
+
+ /**
+ * Converts {@link DMaaPMRSourcePluginConfig} to {@link DMaaPMRSubscriberConfig} object
+ *
+ * @param sourcePluginConfig DMaaP MR Source Plugin Config
+ *
+ * @return DMaaP MR Subscriber Config
+ */
+ @Nonnull
+ @Override
+ public DMaaPMRSubscriberConfig apply(@Nonnull DMaaPMRSourcePluginConfig sourcePluginConfig) {
+
+ // Create a new subscriber settings builder
+ final String hostName = sourcePluginConfig.getHostName();
+ final String topicName = sourcePluginConfig.getTopicName();
+ if (isEmpty(hostName) || isEmpty(topicName)) {
+ throw new IllegalStateException("DMaaP MR Source Host Name and Topic Name must be present");
+ }
+ final DMaaPMRSubscriberConfig.Builder subscriberConfigBuilder = new DMaaPMRSubscriberConfig.Builder(
+ hostName, topicName);
+
+ // Setup up any optional subscriber parameters if they are present
+ final Integer subscriberHostPortNumber = sourcePluginConfig.getPortNumber();
+ if (subscriberHostPortNumber != null) {
+ subscriberConfigBuilder.setPortNumber(subscriberHostPortNumber);
+ }
+
+ final String subscriberProtocol = sourcePluginConfig.getProtocol();
+ if (isPresent(subscriberProtocol)) {
+ subscriberConfigBuilder.setProtocol(subscriberProtocol);
+ }
+
+ final String subscriberUserName = sourcePluginConfig.getUserName();
+ if (isPresent(subscriberUserName)) {
+ subscriberConfigBuilder.setUserName(subscriberUserName);
+ }
+
+ final String subscriberUserPassword = sourcePluginConfig.getUserPassword();
+ if (isPresent(subscriberUserPassword)) {
+ subscriberConfigBuilder.setUserPassword(subscriberUserPassword);
+ }
+
+ final String subscriberContentType = sourcePluginConfig.getContentType();
+ if (isPresent(subscriberContentType)) {
+ subscriberConfigBuilder.setContentType(subscriberContentType);
+ }
+
+ final String subscriberConsumerId = sourcePluginConfig.getConsumerId();
+ if (isPresent(subscriberConsumerId)) {
+ subscriberConfigBuilder.setConsumerId(subscriberConsumerId);
+ }
+
+ final String subscriberConsumerGroup = sourcePluginConfig.getConsumerGroup();
+ if (isPresent(subscriberConsumerGroup)) {
+ subscriberConfigBuilder.setConsumerGroup(subscriberConsumerGroup);
+ }
+
+ final Integer subscriberTimeoutMS = sourcePluginConfig.getTimeoutMS();
+ if (subscriberTimeoutMS != null) {
+ subscriberConfigBuilder.setTimeoutMS(subscriberTimeoutMS);
+ }
+ final Integer subscriberMessageLimit = sourcePluginConfig.getMessageLimit();
+ if (subscriberMessageLimit != null) {
+ subscriberConfigBuilder.setMessageLimit(subscriberMessageLimit);
+ }
+
+ // return Subscriber config
+ return subscriberConfigBuilder.build();
+ }
+}