aboutsummaryrefslogtreecommitdiffstats
path: root/dcae-analytics-cdap-plugins/src/main/java/org/onap/dcae/apod/analytics/cdap/plugins/utils/CDAPPluginUtils.java
blob: 657f0af41b67ed29eda3508c119c5a40eba4aa13 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
/*
 * ===============================LICENSE_START======================================
 *  dcae-analytics
 * ================================================================================
 *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
 * ================================================================================
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *          http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *  ============================LICENSE_END===========================================
 */

package org.onap.dcae.apod.analytics.cdap.plugins.utils;

import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.etl.api.PipelineConfigurer;
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
import org.onap.dcae.apod.analytics.cdap.plugins.domain.schema.dmaap.DMaaPSourceOutputSchema;
import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
import org.onap.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
 * @author Rajiv Singla . Creation Date: 1/26/2017.
 */
public abstract class CDAPPluginUtils extends AnalyticsModelJsonUtils {

    private static final Logger LOG = LoggerFactory.getLogger(CDAPPluginUtils.class);

    public static final Function<Schema, Schema.Type> SCHEMA_TO_TYPE_FUNCTION = new Function<Schema, Schema.Type>() {
        @Override
        public Schema.Type apply(Schema schema) {
            return schema.getType();
        }
    };



    private CDAPPluginUtils() {
        // private constructor
    }

    /**
     * Validates if CDAP Schema contains expected fields
     *
     * @param schema schema that need to be validated
     * @param expectedFields fields that are expected to be in the schema
     */

    public static void validateSchemaContainsFields(@Nullable final Schema schema, final String... expectedFields) {

        LOG.debug("Validating schema:{} contains expected fields:{}", schema, Arrays.toString(expectedFields));

        if (schema == null) {
            // If input schema is null then no validation possible
            LOG.warn("Input Schema is null. No validation possible");
        } else {
            // Check if expected fields are indeed present in the schema
            for (String expectedField : expectedFields) {
                final Schema.Field schemaField = schema.getField(expectedField);
                if (schemaField == null) {
                    final String errorMessage = String.format(
                            "Unable to find expected field: %s, in schema: %s", expectedField, schema);
                    throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
                }
            }
            LOG.debug("Successfully validated schema:{}, contains expected fields:{}", schema,
                    Arrays.toString(expectedFields));
        }
    }


    /**
     * Creates a new Structured Record containing DMaaP MR fetched message
     *
     * @param message DMaaP MR fetch message
     *
     * @return Structured record containing DMaaP MR Message
     */
    public static StructuredRecord createDMaaPMRResponseStructuredRecord(final String message) {
        StructuredRecord.Builder recordBuilder = StructuredRecord.builder(DMaaPSourceOutputSchema.getSchema());
        recordBuilder
                .set(DMaaPSourceOutputSchema.TIMESTAMP.getSchemaColumnName(), System.nanoTime())
                .set(DMaaPSourceOutputSchema.RESPONSE_CODE.getSchemaColumnName(), 200)
                .set(DMaaPSourceOutputSchema.RESPONSE_MESSAGE.getSchemaColumnName(), "OK")
                .set(DMaaPSourceOutputSchema.FETCHED_MESSAGE.getSchemaColumnName(), message);
        return recordBuilder.build();
    }


    /**
     * Creates output StructuredRecord Builder which has copied values from input StructuredRecord
     *
     * @param outputSchema output Schema
     * @param inputStructuredRecord input Structured Record
     *
     * @return output Structured Record builder with pre populated values from input structured record
     */
    public static StructuredRecord.Builder createOutputStructuredRecordBuilder(
            @Nonnull final Schema outputSchema,
            @Nonnull final StructuredRecord inputStructuredRecord) {

        // Get input structured Record Schema
        final Schema inputSchema = inputStructuredRecord.getSchema();
        // Create new instance of output Structured Record Builder from output Schema
        final StructuredRecord.Builder outputStructuredRecordBuilder = StructuredRecord.builder(outputSchema);

        // iterate over input fields and if output schema has field with same name copy the value to out record builder
        for (Schema.Field inputField : inputSchema.getFields()) {
            final String inputFieldName = inputField.getName();
            if (outputSchema.getField(inputFieldName) != null) {
                outputStructuredRecordBuilder.set(inputFieldName, inputStructuredRecord.get(inputFieldName));
            }
        }

        return outputStructuredRecordBuilder;
    }


    /**
     * Adds Field value to StructuredRecord Builder if schema contains that field Name
     *
     * @param structuredRecordBuilder structured record builder
     * @param structuredRecordSchema schema for structured record builder
     * @param fieldName field name
     * @param fieldValue field value
     *
     * @return structured record builder with populated field name and value if schema contains field name
     */
    public static StructuredRecord.Builder addFieldValueToStructuredRecordBuilder(
            @Nonnull final StructuredRecord.Builder structuredRecordBuilder,
            @Nonnull final Schema structuredRecordSchema,
            @Nonnull final String fieldName,
            final Object fieldValue) {

        // check if schema contains field Name
        if (structuredRecordSchema.getField(fieldName) != null) {
            structuredRecordBuilder.set(fieldName, fieldValue);
        } else {
            LOG.info("Unable to populate value for field Name: {} with field value: {}. " +
                            "Schema Fields: {} does not contain field name: {}",
                    fieldName, fieldValue, structuredRecordSchema.getFields(), fieldName);
        }

        return structuredRecordBuilder;
    }


    /**
     * Validates that given schema String has fieldName of expected type. If field does not exist in given schema
     * then validation will pass with warning. If field does exist in given schema then this validation will return
     * true if field type is same as expected type else false
     *
     * @param schemaString CDAP Plugin output or input schema string
     * @param fieldName field name
     * @param expectedFieldType expected schema field type
     *
     * @return true if field type matches expected field type else false. If field does not exist in
     * give schema validation will pass but will generate a warning message
     */
    public static boolean validateSchemaFieldType(@Nonnull final String schemaString,
                                                  @Nonnull final String fieldName,
                                                  @Nonnull final Schema.Type expectedFieldType) {

        try {
            // parse given schema String
            final Schema outputSchema = Schema.parseJson(schemaString);
            final Schema.Field schemaField = outputSchema.getField(fieldName);

            // if given schema does contain field then validated fieldName type
            if (schemaField != null) {

                final List<Schema> schemas = new LinkedList<>();

                // if it is a union type then grab all union schemas
                if (outputSchema.getField(fieldName).getSchema().getType() == Schema.Type.UNION) {
                    final List<Schema> unionFieldSchemas =
                            outputSchema.getField(fieldName).getSchema().getUnionSchemas();
                    schemas.addAll(unionFieldSchemas);
                } else {
                    // if not union type the just get the field schema
                    final Schema fieldSchema = outputSchema.getField(fieldName).getSchema();
                    schemas.add(fieldSchema);
                }

                // get all schema types
                final List<Schema.Type> fieldTypes =
                        Lists.transform(schemas, CDAPPluginUtils.SCHEMA_TO_TYPE_FUNCTION);

                // if all schema types does not contain expected field type then return false
                if (!fieldTypes.contains(expectedFieldType)) {
                    LOG.error("Validation failed for fieldName: {} is NOT of expected Type: {} in schema: {}",
                            fieldName, expectedFieldType, outputSchema);
                    return false;
                }

                // field type validation passed
                LOG.debug("Successfully validated fieldName: {} is of expected Type: {}",
                        fieldName, expectedFieldType);

                return true;

            } else {

                // if field does not exist then the validation will pass but will generate warning message
                LOG.warn("Validation of field type not possible. Field name: {} does not exist in schema: {}",
                        fieldName, outputSchema);
                return true;
            }

        } catch (IOException e) {
            final String errorMessage =
                    String.format("Unable to parse schema: %s for field type validation. " +
                                    "Field Name: %s, Expected Field Type: %s Exception: %s",
                            schemaString, fieldName, expectedFieldType, e);
            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
        }

    }


    /**
     * Parses provided schema String as Schema object and set it as output Schema format
     *
     * @param pipelineConfigurer plugin pipeline configurer
     * @param schemaString schema String to be set as output schema
     */
    public static void setOutputSchema(final PipelineConfigurer pipelineConfigurer, final String schemaString) {
        try {
            final Schema outputSchema = Schema.parseJson(schemaString);
            pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
        } catch (IOException e) {
            final String errorMessage = String.format(
                    "Schema specified is not a valid JSON. Schema String: %s, Exception: %s", schemaString, e);
            throw new CDAPSettingsException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
        }
    }


    /**
     * Parses incoming plugin config mapping to key value map. If any of the key value map is blank an Illegal Argument
     * exception will be thrown
     *
     * @param mappingFieldString field Mapping String
     *
     * @return map containing mapping key values
     */
    public static Map<String, String> extractFieldMappings(final String mappingFieldString) {
        final Map<String, String> fieldMappings = Maps.newHashMap();
        if (StringUtils.isNotBlank(mappingFieldString)) {
            final Splitter commaSplitter = Splitter.on(",");
            for (String fieldMapping : commaSplitter.split(mappingFieldString)) {
                final String[] keyValueMappings = fieldMapping.split(":");
                if (keyValueMappings.length != 2 ||
                        StringUtils.isBlank(keyValueMappings[0]) ||
                        StringUtils.isBlank(keyValueMappings[1])) {
                    final String errorMessage = "Field Mapping key or value is Blank. All field mappings must " +
                            "be present in mappings: " + mappingFieldString;
                    throw new DCAEAnalyticsRuntimeException(
                            errorMessage, LOG, new IllegalArgumentException(errorMessage));
                }
                fieldMappings.put(keyValueMappings[0].trim(), keyValueMappings[1].trim());
            }
        }
        return fieldMappings;
    }




}