summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java
blob: 51d3168ee16ed6109c820070f3780a0900c06a8a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
/*
 * ============LICENSE_START=======================================================
 * ONAP : DataLake
 * ================================================================================
 * Copyright 2019 China Mobile
 *=================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */
package org.onap.datalake.feeder.util;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import lombok.Getter;
import org.apache.velocity.Template;
import org.apache.velocity.VelocityContext;
import org.apache.velocity.app.Velocity;
import org.apache.velocity.runtime.RuntimeConstants;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;


/*
 * read sample json and output supervisor to  resources\druid\generated
 * need manual edit to be production ready, final versions are in resources\druid
 *
 * http://druid.io/docs/latest/tutorials/tutorial-ingestion-spec.html
 * http://druid.io/docs/latest/ingestion/flatten-json
 *
 *
 * todo:
 * reduce the manual editing
 * path hard coded
 * auto get topics,
 * auto get sample, and for each topic, get multiple samples.
 * make supervisor file names consistent
 * dimension type default is string, in msgrtr.apinode.metrics.dmaap , many are long/double, so need to generate dimensionsSpec, this is done at the end of printFlattenSpec()
 */

@Getter
public class DruidSupervisorGenerator {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    Template template = null;
    VelocityContext context;

    List<String[]> dimensions;

    public DruidSupervisorGenerator() {
        dimensions = new ArrayList<>();

        Velocity.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath");
        Velocity.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());

        Velocity.init();

        context = new VelocityContext();

        context.put("host", "message-router-kafka:9092");//TODO get from config

        template = Velocity.getTemplate("druid/kafka-supervisor-template.vm");
    }

    private void printNode(String prefix, JsonNode node) {

        // lets see what type the node is
        //		System.out.println("NodeType=" + node.getNodeType() + ", isContainerNode=" + node.isContainerNode() + ", " + node); // prints OBJECT

        if (node.isContainerNode()) {

            Iterator<Entry<String, JsonNode>> fields = node.fields();

            while (fields.hasNext()) {
                Entry<String, JsonNode> field = fields.next();
                printNode(prefix + "." + field.getKey(), field.getValue());
            }

            if (node.isArray()) {
                Iterator<JsonNode> elements = node.elements();
                int i = 0;
                while (elements.hasNext()) {
                    JsonNode element = elements.next();
                    printNode(prefix + "[" + i + "]", element);
                    i++;
                }
            }

        } else {
            printFlattenSpec(node.getNodeType(), prefix);
        }

    }

    private void printFlattenSpec(JsonNodeType type, String path) {
        String name = path.substring(2).replace('.', ':');
        // lets see what type the node is
        log.info("{");
        log.info("\"type\": \"path\",");
        log.info("\"name\": \"" + name + "\",");
        log.info("\"expr\": \"" + path + "\"");
        log.info("},");

        dimensions.add(new String[]{name, path});
    }

    public void doTopic(String topic) throws IOException {
        dimensions.clear();

        String sampleFileName = "src/main/resources/druid/" + topic + "-sample-format.json";//FIXME hard coded path
        String outputFileName = "src/main/resources/druid/generated/" + topic + "-kafka-supervisor.json";

        // Get the contents of json as a string using commons IO IOUTils class.
        String sampleJson = Util.getTextFromFile(sampleFileName);

        // create an ObjectMapper instance.
        ObjectMapper mapper = new ObjectMapper();
        // use the ObjectMapper to read the json string and create a tree
        JsonNode root = mapper.readTree(sampleJson);
        printNode("$", root);

        context.put("topic", topic);
        context.put("timestamp", "event-header:timestamp");//FIXME hard coded, should be topic based
        context.put("timestampFormat", "yyyyMMdd-HH:mm:ss:SSS");//FIXME hard coded, should be topic based
        context.put("dimensions", dimensions);

        BufferedWriter out = new BufferedWriter(new FileWriter(outputFileName));

        template.merge(context, out);
        out.close();
    }

    public static void main(String[] args) throws MalformedURLException, IOException {
        String[] topics = new String[]{"AAI-EVENT", "msgrtr.apinode.metrics.dmaap", "unauthenticated.DCAE_CL_OUTPUT", "unauthenticated.SEC_FAULT_OUTPUT"};//FIXME hard coded

        DruidSupervisorGenerator p = new DruidSupervisorGenerator();

        for (String topic : topics) {
            p.doTopic(topic);
        }
    }
}