aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/aai/dbgen/GraphSONPartialReader.java
blob: ebe2180176333b9ab252b3e3059f28719631661d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
/**
 * ============LICENSE_START=======================================================
 * org.onap.aai
 * ================================================================================
 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */
package org.onap.aai.dbgen;

import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.io.GraphReader;
import org.apache.tinkerpop.gremlin.structure.io.GraphWriter;
import org.apache.tinkerpop.gremlin.structure.io.Mapper;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONReader;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONVersion;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import org.apache.tinkerpop.gremlin.structure.util.Host;
import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.apache.tinkerpop.shaded.jackson.core.type.TypeReference;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
import org.apache.tinkerpop.shaded.jackson.databind.node.JsonNodeType;
import org.onap.aai.dbmap.InMemoryGraph;

import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Stream;

/**
 * This is a Wrapper around the GraphsonReader class
 * The idea is to rewrite methods that are customized for A&AI
 * GraphsonReader is a final class . hence the use of the Wrapper
 * instead of inheriting-overwriting
 *
 * 
 */
public final class GraphSONPartialReader implements GraphReader {
    private final ObjectMapper mapper ;
    private final long batchSize ;
    private final GraphSONVersion version ;
    private boolean unwrapAdjacencyList = false;
    private final GraphSONReader reader;
    
    private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(InMemoryGraph.class);

    final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>() {
    };

    private GraphSONPartialReader(final Builder builder) {
        mapper = builder.mapper.createMapper();
        batchSize = builder.batchSize;
        unwrapAdjacencyList = builder.unwrapAdjacencyList;
        version = ((GraphSONMapper)builder.mapper).getVersion();
        reader = GraphSONReader.build().create();
    }

    /**
     * Read data into a {@link Graph} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
     * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
     *
     * @param inputStream a stream containing an entire graph of vertices and edges as defined by the accompanying
     *                    {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
     * @param graphToWriteTo the graph to write to when reading from the stream.
     */
    @Override
    public void readGraph(final InputStream inputStream, final Graph graphToWriteTo) throws IOException {
        // dual pass - create all vertices and store to cache the ids.  then create edges.  as long as we don't
        // have vertex labels in the output we can't do this single pass
    	LOGGER.info("Read the Partial Graph");
    	final Map<StarGraph.StarVertex,Vertex> cache = new HashMap<>();
        final AtomicLong counter = new AtomicLong(0);
        
        final boolean supportsTx = graphToWriteTo.features().graph().supportsTransactions();
        final Graph.Features.EdgeFeatures edgeFeatures = graphToWriteTo.features().edge();
        
        readVertexStrings(inputStream).<Vertex>map(FunctionUtils.wrapFunction(line -> readVertex(new ByteArrayInputStream(line.getBytes()), null, null, Direction.IN))).forEach(vertex -> {
        	try{
        		final Attachable<Vertex> attachable = (Attachable<Vertex>) vertex;
	            cache.put((StarGraph.StarVertex) attachable.get(), attachable.attach(Attachable.Method.create(graphToWriteTo)));
	            if (supportsTx && counter.incrementAndGet() % batchSize == 0)
	                graphToWriteTo.tx().commit();
        	}
        	catch(Exception ex){
        		LOGGER.info("Error in reading vertex from graphson"+vertex.toString());
        	}
        });
        
        cache.entrySet().forEach(kv -> kv.getKey().edges(Direction.IN).forEachRemaining(e -> {
        	try{
	        	// can't use a standard Attachable attach method here because we have to use the cache for those
	            // graphs that don't support userSuppliedIds on edges.  note that outVertex/inVertex methods return
	            // StarAdjacentVertex whose equality should match StarVertex.
	            final Vertex cachedOutV = cache.get(e.outVertex());
	            final Vertex cachedInV = cache.get(e.inVertex());
	            
	            if(cachedOutV != null  && cachedInV != null){
	            	 
		            final Edge newEdge = edgeFeatures.willAllowId(e.id()) ? cachedOutV.addEdge(e.label(), cachedInV, T.id, e.id()) : cachedOutV.addEdge(e.label(), cachedInV);
		            e.properties().forEachRemaining(p -> newEdge.property(p.key(), p.value()));
		        }
	            else{
	            	LOGGER.debug("Ghost edges from "+ cachedOutV + " to "+ cachedInV);
	            	
	            }
	            if (supportsTx && counter.incrementAndGet() % batchSize == 0)
	                graphToWriteTo.tx().commit();
        	}
        	catch(Exception ex){
        		LOGGER.info("Error in writing vertex into graph"+e.toString());
        	}
        }));

        if (supportsTx) graphToWriteTo.tx().commit();
    }

    /**
     * Read {@link Vertex} objects from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
     * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
     *
     * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
     * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
     * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
     * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
     */
    @Override
    public Iterator<Vertex> readVertices(final InputStream inputStream,
                                         final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
                                         final Function<Attachable<Edge>, Edge> edgeAttachMethod,
                                         final Direction attachEdgesOfThisDirection) throws IOException {
       // return readVertexStrings(inputStream).<Vertex>map(FunctionUtils.wrapFunction(line -> readVertex(new ByteArrayInputStream(line.getBytes()), vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection))).iterator();
        return reader.readVertices(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
        		
    }

    /**
     * Read a {@link Vertex}  from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
     * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
     *
     * @param inputStream a stream containing at least a single vertex as defined by the accompanying
     *                    {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
     * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
     */
    @Override
    public Vertex readVertex(final InputStream inputStream, final Function<Attachable<Vertex>, Vertex> vertexAttachMethod) throws IOException {
        return reader.readVertex(inputStream, vertexAttachMethod);
    }

    /**
     * Read a {@link Vertex} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
     * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
     *
     * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
     * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
     * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
     * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
     */
    @Override
    public Vertex readVertex(final InputStream inputStream,
                             final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
                             final Function<Attachable<Edge>, Edge> edgeAttachMethod,
                             final Direction attachEdgesOfThisDirection) throws IOException {
    	
    	return reader.readVertex(inputStream, vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection);
    }

    /**
     * Read an {@link Edge} from output generated by {@link GraphSONWriter#writeEdge(OutputStream, Edge)} or via
     * an {@link Edge} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
     *
     * @param inputStream a stream containing at least one {@link Edge} as defined by the accompanying
     *                    {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
     * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
     */
    @Override
    public Edge readEdge(final InputStream inputStream, final Function<Attachable<Edge>, Edge> edgeAttachMethod) throws IOException {
        /*if (version == GraphSONVersion.V1_0) {
            final Map<String, Object> edgeData = mapper.readValue(inputStream, mapTypeReference);

            final Map<String, Object> edgeProperties = edgeData.containsKey(GraphSONTokens.PROPERTIES) ?
                    (Map<String, Object>) edgeData.get(GraphSONTokens.PROPERTIES) : Collections.EMPTY_MAP;
            final DetachedEdge edge = new DetachedEdge(edgeData.get(GraphSONTokens.ID),
                    edgeData.get(GraphSONTokens.LABEL).toString(),
                    edgeProperties,
                    Pair.with(edgeData.get(GraphSONTokens.OUT), edgeData.get(GraphSONTokens.OUT_LABEL).toString()),
                    Pair.with(edgeData.get(GraphSONTokens.IN), edgeData.get(GraphSONTokens.IN_LABEL).toString()));

            return edgeAttachMethod.apply(edge);
        } else {
            return edgeAttachMethod.apply((DetachedEdge) mapper.readValue(inputStream, Edge.class));
        }*/
    	return reader.readEdge(inputStream, edgeAttachMethod);
    }

    /**
     * Read a {@link VertexProperty} from output generated by
     * {@link GraphSONWriter#writeVertexProperty(OutputStream, VertexProperty)} or via an {@link VertexProperty} passed
     * to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
     *
     * @param inputStream a stream containing at least one {@link VertexProperty} as written by the accompanying
     *                    {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
     * @param vertexPropertyAttachMethod a function that creates re-attaches a {@link VertexProperty} to a
     *                                   {@link Host} object.
     */
    @Override
    public VertexProperty readVertexProperty(final InputStream inputStream,
                                             final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
        /*if (version == GraphSONVersion.V1_0) {
            final Map<String, Object> vpData = mapper.readValue(inputStream, mapTypeReference);
            final Map<String, Object> metaProperties = (Map<String, Object>) vpData.get(GraphSONTokens.PROPERTIES);
            final DetachedVertexProperty vp = new DetachedVertexProperty(vpData.get(GraphSONTokens.ID),
                    vpData.get(GraphSONTokens.LABEL).toString(),
                    vpData.get(GraphSONTokens.VALUE), metaProperties);
            return vertexPropertyAttachMethod.apply(vp);
        } else {
            return vertexPropertyAttachMethod.apply((DetachedVertexProperty) mapper.readValue(inputStream, VertexProperty.class));
        }*/
    	return reader.readVertexProperty(inputStream, vertexPropertyAttachMethod);
    }

    /**
     * Read a {@link Property} from output generated by  {@link GraphSONWriter#writeProperty(OutputStream, Property)} or
     * via an {@link Property} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
     *
     * @param inputStream a stream containing at least one {@link Property} as written by the accompanying
     *                    {@link GraphWriter#writeProperty(OutputStream, Property)} method.
     * @param propertyAttachMethod a function that creates re-attaches a {@link Property} to a {@link Host} object.
     */
    @Override
    public Property readProperty(final InputStream inputStream,
                                 final Function<Attachable<Property>, Property> propertyAttachMethod) throws IOException {
        /*if (version == GraphSONVersion.V1_0) {
            final Map<String, Object> propertyData = mapper.readValue(inputStream, mapTypeReference);
            final DetachedProperty p = new DetachedProperty(propertyData.get(GraphSONTokens.KEY).toString(), propertyData.get(GraphSONTokens.VALUE));
            return propertyAttachMethod.apply(p);
        } else {
            return propertyAttachMethod.apply((DetachedProperty) mapper.readValue(inputStream, Property.class));
        }*/
    	return reader.readProperty(inputStream, propertyAttachMethod);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public <C> C readObject(final InputStream inputStream, final Class<? extends C> clazz) throws IOException {
        return mapper.readValue(inputStream, clazz);
    }

    private Stream<String> readVertexStrings(final InputStream inputStream) throws IOException {
        if (unwrapAdjacencyList) {
        	final JsonNode root = mapper.readTree(inputStream);
            final JsonNode vertices = root.get(GraphSONTokens.VERTICES);
            if (!vertices.getNodeType().equals(JsonNodeType.ARRAY)) throw new IOException(String.format("The '%s' key must be an array", GraphSONTokens.VERTICES));
            return IteratorUtils.stream(vertices.elements()).map(Object::toString);
        } else {
        	final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
            return br.lines();
        }
    	
    }

    
    public static Builder build() {
        return new Builder();
    }

    public final static class Builder implements ReaderBuilder<GraphSONPartialReader> {
        private long batchSize = 10000;

        private Mapper<ObjectMapper> mapper = GraphSONMapper.build().create();
        private boolean unwrapAdjacencyList = false;
        

        private Builder() {}

        /**
         * Number of mutations to perform before a commit is executed when using
         * {@link GraphSONPartialReader#readGraph(InputStream, Graph)}.
         */
        public Builder batchSize(final long batchSize) {
            this.batchSize = batchSize;
            return this;
        }

        /**
         * Override all of the {@link GraphSONMapper} builder
         * options with this mapper.  If this value is set to something other than null then that value will be
         * used to construct the writer.
         */
        public Builder mapper(final Mapper<ObjectMapper> mapper) {
            this.mapper = mapper;
            return this;
        }

        /**
         * If the adjacency list is wrapped in a JSON object, as is done when writing a graph with
         * {@link GraphSONWriter.Builder#wrapAdjacencyList} set to {@code true}, this setting needs to be set to
         * {@code true} to properly read it.  By default, this value is {@code false} and the adjacency list is
         * simply read as line delimited vertices.
         * <p/>
         * By setting this value to {@code true}, the generated JSON is no longer "splittable" by line and thus not
         * suitable for OLAP processing.  Furthermore, reading this format of the JSON with
         * {@link GraphSONPartialReader#readGraph(InputStream, Graph)} or
         * {@link GraphSONPartialReader#readVertices(InputStream, Function, Function, Direction)} requires that the
         * entire JSON object be read into memory, so it is best saved for "small" graphs.
         */
        public Builder unwrapAdjacencyList(final boolean unwrapAdjacencyList) {
            this.unwrapAdjacencyList = unwrapAdjacencyList;
            return this;
        }

        public GraphSONPartialReader create() {
            return new GraphSONPartialReader(this);
        }
    }
}