Skip to content

Commit a458e1a

Browse files
committed
PARQUET-243: Add Avro reflect support
Author: Ryan Blue <blue@apache.org> Closes #165 from rdblue/PARQUET-243-add-avro-reflect and squashes the following commits: a1a17b4 [Ryan Blue] PARQUET-243: Update for Tom's review comments. 16584d1 [Ryan Blue] PARQUET-243: Fix AvroWriteSupport bug. fa4a9ec [Ryan Blue] PARQUET-243: Add reflect tests. 4c50cd1 [Ryan Blue] PARQUET-243: Update write support for reflected objects. b50c482 [Ryan Blue] PARQUET-243: Update tests to run with new converters. 0b7a333 [Ryan Blue] PARQUET-243: Use common AvroConverters where possible. 2f6825d [Ryan Blue] PARQUET-243: Add reflect converters that behave more like Avro. 98f10df [Ryan Blue] PARQUET-243: Add Avro compatible record materializer.
1 parent 60edcf9 commit a458e1a

21 files changed

Lines changed: 2482 additions & 275 deletions

parquet-avro/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@
5757
<artifactId>avro</artifactId>
5858
<version>${avro.version}</version>
5959
</dependency>
60+
<dependency>
61+
<groupId>it.unimi.dsi</groupId>
62+
<artifactId>fastutil</artifactId>
63+
<version>${fastutil.version}</version>
64+
</dependency>
6065
<dependency>
6166
<groupId>org.apache.hadoop</groupId>
6267
<artifactId>hadoop-client</artifactId>
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.avro;
20+
21+
import org.apache.avro.Schema;
22+
import org.apache.avro.generic.GenericData;
23+
import org.apache.avro.generic.IndexedRecord;
24+
import org.apache.parquet.io.api.GroupConverter;
25+
import org.apache.parquet.io.api.RecordMaterializer;
26+
import org.apache.parquet.schema.MessageType;
27+
28+
class AvroCompatRecordMaterializer<T extends IndexedRecord> extends RecordMaterializer<T> {
29+
30+
private AvroIndexedRecordConverter<T> root;
31+
32+
public AvroCompatRecordMaterializer(MessageType requestedSchema, Schema avroSchema,
33+
GenericData baseModel) {
34+
this.root = new AvroIndexedRecordConverter<T>(requestedSchema, avroSchema, baseModel);
35+
}
36+
37+
@Override
38+
public T getCurrentRecord() {
39+
return root.getCurrentRecord();
40+
}
41+
42+
@Override
43+
public GroupConverter getRootConverter() {
44+
return root;
45+
}
46+
}
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.avro;
20+
21+
import java.nio.ByteBuffer;
22+
import org.apache.avro.Schema;
23+
import org.apache.avro.generic.GenericData;
24+
import org.apache.parquet.column.Dictionary;
25+
import org.apache.parquet.io.api.Binary;
26+
import org.apache.parquet.io.api.GroupConverter;
27+
import org.apache.parquet.io.api.PrimitiveConverter;
28+
29+
public class AvroConverters {
30+
31+
public abstract static class AvroGroupConverter extends GroupConverter {
32+
protected final ParentValueContainer parent;
33+
34+
public AvroGroupConverter(ParentValueContainer parent) {
35+
this.parent = parent;
36+
}
37+
}
38+
39+
static class AvroPrimitiveConverter extends PrimitiveConverter {
40+
protected final ParentValueContainer parent;
41+
42+
public AvroPrimitiveConverter(ParentValueContainer parent) {
43+
this.parent = parent;
44+
}
45+
}
46+
47+
static final class FieldByteConverter extends AvroPrimitiveConverter {
48+
public FieldByteConverter(ParentValueContainer parent) {
49+
super(parent);
50+
}
51+
52+
@Override
53+
public void addInt(int value) {
54+
parent.addByte((byte) value);
55+
}
56+
}
57+
static final class FieldShortConverter extends AvroPrimitiveConverter {
58+
public FieldShortConverter(ParentValueContainer parent) {
59+
super(parent);
60+
}
61+
62+
@Override
63+
public void addInt(int value) {
64+
parent.addShort((short) value);
65+
}
66+
}
67+
68+
static final class FieldCharConverter extends AvroPrimitiveConverter {
69+
public FieldCharConverter(ParentValueContainer parent) {
70+
super(parent);
71+
}
72+
73+
@Override
74+
public void addInt(int value) {
75+
parent.addChar((char) value);
76+
}
77+
}
78+
79+
static final class FieldBooleanConverter extends AvroPrimitiveConverter {
80+
public FieldBooleanConverter(ParentValueContainer parent) {
81+
super(parent);
82+
}
83+
84+
@Override
85+
final public void addBoolean(boolean value) {
86+
parent.addBoolean(value);
87+
}
88+
}
89+
90+
static final class FieldIntegerConverter extends AvroPrimitiveConverter {
91+
public FieldIntegerConverter(ParentValueContainer parent) {
92+
super(parent);
93+
}
94+
95+
@Override
96+
final public void addInt(int value) {
97+
parent.addInt(value);
98+
}
99+
}
100+
101+
static final class FieldLongConverter extends AvroPrimitiveConverter {
102+
public FieldLongConverter(ParentValueContainer parent) {
103+
super(parent);
104+
}
105+
106+
@Override
107+
final public void addInt(int value) {
108+
parent.addLong((long) value);
109+
}
110+
111+
@Override
112+
final public void addLong(long value) {
113+
parent.addLong(value);
114+
}
115+
}
116+
117+
static final class FieldFloatConverter extends AvroPrimitiveConverter {
118+
public FieldFloatConverter(ParentValueContainer parent) {
119+
super(parent);
120+
}
121+
122+
@Override
123+
final public void addInt(int value) {
124+
parent.addFloat((float) value);
125+
}
126+
127+
@Override
128+
final public void addLong(long value) {
129+
parent.addFloat((float) value);
130+
}
131+
132+
@Override
133+
final public void addFloat(float value) {
134+
parent.addFloat(value);
135+
}
136+
137+
}
138+
139+
static final class FieldDoubleConverter extends AvroPrimitiveConverter {
140+
public FieldDoubleConverter(ParentValueContainer parent) {
141+
super(parent);
142+
}
143+
144+
@Override
145+
final public void addInt(int value) {
146+
parent.addDouble((double) value);
147+
}
148+
149+
@Override
150+
final public void addLong(long value) {
151+
parent.addDouble((double) value);
152+
}
153+
154+
@Override
155+
final public void addFloat(float value) {
156+
parent.addDouble((double) value);
157+
}
158+
159+
@Override
160+
final public void addDouble(double value) {
161+
parent.addDouble(value);
162+
}
163+
}
164+
165+
static final class FieldByteArrayConverter extends AvroPrimitiveConverter {
166+
public FieldByteArrayConverter(ParentValueContainer parent) {
167+
super(parent);
168+
}
169+
170+
@Override
171+
final public void addBinary(Binary value) {
172+
parent.add(value.getBytes());
173+
}
174+
}
175+
176+
static final class FieldByteBufferConverter extends AvroPrimitiveConverter {
177+
public FieldByteBufferConverter(ParentValueContainer parent) {
178+
super(parent);
179+
}
180+
181+
@Override
182+
final public void addBinary(Binary value) {
183+
parent.add(ByteBuffer.wrap(value.getBytes()));
184+
}
185+
}
186+
187+
static final class FieldStringConverter extends AvroPrimitiveConverter {
188+
// TODO: dictionary support should be generic and provided by a parent
189+
// TODO: this always produces strings, but should respect avro.java.string
190+
private String[] dict;
191+
192+
public FieldStringConverter(ParentValueContainer parent) {
193+
super(parent);
194+
}
195+
196+
@Override
197+
final public void addBinary(Binary value) {
198+
parent.add(value.toStringUsingUTF8());
199+
}
200+
201+
@Override
202+
public boolean hasDictionarySupport() {
203+
return true;
204+
}
205+
206+
@Override
207+
public void setDictionary(Dictionary dictionary) {
208+
dict = new String[dictionary.getMaxId() + 1];
209+
for (int i = 0; i <= dictionary.getMaxId(); i++) {
210+
dict[i] = dictionary.decodeToBinary(i).toStringUsingUTF8();
211+
}
212+
}
213+
214+
@Override
215+
public void addValueFromDictionary(int dictionaryId) {
216+
parent.add(dict[dictionaryId]);
217+
}
218+
}
219+
220+
static final class FieldEnumConverter extends AvroPrimitiveConverter {
221+
private final Schema schema;
222+
private final GenericData model;
223+
224+
public FieldEnumConverter(ParentValueContainer parent, Schema enumSchema,
225+
GenericData model) {
226+
super(parent);
227+
this.schema = enumSchema;
228+
this.model = model;
229+
}
230+
231+
@Override
232+
final public void addBinary(Binary value) {
233+
parent.add(model.createEnum(value.toStringUsingUTF8(), schema));
234+
}
235+
}
236+
237+
static final class FieldFixedConverter extends AvroPrimitiveConverter {
238+
private final Schema schema;
239+
private final GenericData model;
240+
241+
public FieldFixedConverter(ParentValueContainer parent, Schema avroSchema,
242+
GenericData model) {
243+
super(parent);
244+
this.schema = avroSchema;
245+
this.model = model;
246+
}
247+
248+
@Override
249+
final public void addBinary(Binary value) {
250+
parent.add(model.createFixed(null /* reuse */, value.getBytes(), schema));
251+
}
252+
}
253+
}

0 commit comments

Comments
 (0)