Skip to content

Commit f086b52

Browse files
committed
Use and record writer time zone in ORC files
Early versions of the Apache ORC writer made the mistake of recording timestamps from an epoch that was relative to the time zone of the writer. This was fixed in later versions by recording the writer time zone in the stripe footer. Hive 3.1 always writes using UTC. Presto used a global configuration for the writer time zone, which was needed to handle old files, but was never updated to use the time zone from the stripe footer. On read, Presto now uses the stripe value if present, otherwise it uses the configured value. On write, Presto continues to write timestamps using the configured time zone, but now records this value when writing files.
1 parent 82ff60d commit f086b52

29 files changed

+147
-71
lines changed

presto-orc/src/main/java/io/prestosql/orc/OrcRecordReader.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
import java.io.Closeable;
4747
import java.io.IOException;
48+
import java.time.ZoneId;
4849
import java.util.ArrayList;
4950
import java.util.Iterator;
5051
import java.util.List;
@@ -227,6 +228,7 @@ public OrcRecordReader(
227228

228229
stripeReader = new StripeReader(
229230
orcDataSource,
231+
hiveStorageTimeZone.toTimeZone().toZoneId(),
230232
decompressor,
231233
types,
232234
this.presentColumns,
@@ -236,7 +238,7 @@ public OrcRecordReader(
236238
metadataReader,
237239
writeValidation);
238240

239-
streamReaders = createStreamReaders(orcDataSource, types, hiveStorageTimeZone, presentColumnsAndTypes.build(), streamReadersSystemMemoryContext);
241+
streamReaders = createStreamReaders(orcDataSource, types, presentColumnsAndTypes.build(), streamReadersSystemMemoryContext);
240242
maxBytesPerCell = new long[streamReaders.length];
241243
nextBatchSize = initialBatchSize;
242244
}
@@ -511,9 +513,10 @@ private void advanceToNextStripe()
511513
// Give readers access to dictionary streams
512514
InputStreamSources dictionaryStreamSources = stripe.getDictionaryStreamSources();
513515
List<ColumnEncoding> columnEncodings = stripe.getColumnEncodings();
516+
ZoneId timeZone = stripe.getTimeZone();
514517
for (StreamReader column : streamReaders) {
515518
if (column != null) {
516-
column.startStripe(dictionaryStreamSources, columnEncodings);
519+
column.startStripe(timeZone, dictionaryStreamSources, columnEncodings);
517520
}
518521
}
519522

@@ -553,7 +556,6 @@ private void validateWritePageChecksum()
553556
private static StreamReader[] createStreamReaders(
554557
OrcDataSource orcDataSource,
555558
List<OrcType> types,
556-
DateTimeZone hiveStorageTimeZone,
557559
Map<Integer, Type> includedColumns,
558560
AggregatedMemoryContext systemMemoryContext)
559561
{
@@ -564,7 +566,7 @@ private static StreamReader[] createStreamReaders(
564566
for (int columnId = 0; columnId < rowType.getFieldCount(); columnId++) {
565567
if (includedColumns.containsKey(columnId)) {
566568
StreamDescriptor streamDescriptor = streamDescriptors.get(columnId);
567-
streamReaders[columnId] = StreamReaders.createStreamReader(streamDescriptor, hiveStorageTimeZone, systemMemoryContext);
569+
streamReaders[columnId] = StreamReaders.createStreamReader(streamDescriptor, systemMemoryContext);
568570
}
569571
}
570572
return streamReaders;

presto-orc/src/main/java/io/prestosql/orc/OrcWriteValidation.java

+25
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import io.prestosql.spi.type.VarcharType;
5151
import org.openjdk.jol.info.ClassLayout;
5252

53+
import java.time.ZoneId;
5354
import java.util.ArrayList;
5455
import java.util.HashMap;
5556
import java.util.List;
@@ -102,6 +103,7 @@ public enum OrcWriteValidationMode
102103
private final OrcEncoding orcEncoding;
103104
private final List<Integer> version;
104105
private final CompressionKind compression;
106+
private final ZoneId timeZone;
105107
private final int rowGroupMaxRowCount;
106108
private final List<String> columnNames;
107109
private final Map<String, Slice> metadata;
@@ -115,6 +117,7 @@ private OrcWriteValidation(
115117
OrcEncoding orcEncoding,
116118
List<Integer> version,
117119
CompressionKind compression,
120+
ZoneId timeZone,
118121
int rowGroupMaxRowCount,
119122
List<String> columnNames,
120123
Map<String, Slice> metadata,
@@ -127,6 +130,7 @@ private OrcWriteValidation(
127130
this.orcEncoding = orcEncoding;
128131
this.version = version;
129132
this.compression = compression;
133+
this.timeZone = timeZone;
130134
this.rowGroupMaxRowCount = rowGroupMaxRowCount;
131135
this.columnNames = columnNames;
132136
this.metadata = metadata;
@@ -157,6 +161,20 @@ public CompressionKind getCompression()
157161
return compression;
158162
}
159163

164+
public ZoneId getTimeZone()
165+
{
166+
return timeZone;
167+
}
168+
169+
public void validateTimeZone(OrcDataSourceId orcDataSourceId, ZoneId actualTimeZone)
170+
throws OrcCorruptionException
171+
{
172+
// DWRF does not store the writer time zone
173+
if (!isDwrf() && !timeZone.equals(actualTimeZone)) {
174+
throw new OrcCorruptionException(orcDataSourceId, "Unexpected time zone");
175+
}
176+
}
177+
160178
public int getRowGroupMaxRowCount()
161179
{
162180
return rowGroupMaxRowCount;
@@ -852,6 +870,7 @@ public static class OrcWriteValidationBuilder
852870

853871
private List<Integer> version;
854872
private CompressionKind compression;
873+
private ZoneId timeZone;
855874
private int rowGroupMaxRowCount;
856875
private int stringStatisticsLimitInBytes;
857876
private List<String> columnNames;
@@ -886,6 +905,11 @@ public void setCompression(CompressionKind compression)
886905
this.compression = compression;
887906
}
888907

908+
public void setTimeZone(ZoneId timeZone)
909+
{
910+
this.timeZone = timeZone;
911+
}
912+
889913
public void setRowGroupMaxRowCount(int rowGroupMaxRowCount)
890914
{
891915
this.rowGroupMaxRowCount = rowGroupMaxRowCount;
@@ -952,6 +976,7 @@ public OrcWriteValidation build()
952976
orcEncoding,
953977
version,
954978
compression,
979+
timeZone,
955980
rowGroupMaxRowCount,
956981
columnNames,
957982
metadata,

presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,14 @@
4646

4747
import java.io.Closeable;
4848
import java.io.IOException;
49+
import java.time.ZoneId;
4950
import java.util.ArrayList;
5051
import java.util.Collections;
5152
import java.util.HashMap;
5253
import java.util.List;
5354
import java.util.Map;
5455
import java.util.Map.Entry;
56+
import java.util.Optional;
5557
import java.util.function.Consumer;
5658
import java.util.stream.Collectors;
5759

@@ -139,6 +141,7 @@ public OrcWriter(
139141
this.orcEncoding = requireNonNull(orcEncoding, "orcEncoding is null");
140142
this.compression = requireNonNull(compression, "compression is null");
141143
recordValidation(validation -> validation.setCompression(compression));
144+
recordValidation(validation -> validation.setTimeZone(hiveStorageTimeZone.toTimeZone().toZoneId()));
142145

143146
requireNonNull(options, "options is null");
144147
checkArgument(options.getStripeMaxSize().compareTo(options.getStripeMinSize()) >= 0, "stripeMaxSize must be greater than stripeMinSize");
@@ -411,7 +414,8 @@ private List<OrcDataOutput> bufferStripeData(long stripeStartOffset, FlushReason
411414
columnStatistics.put(0, new ColumnStatistics((long) stripeRowCount, 0, null, null, null, null, null, null, null, null));
412415

413416
// add footer
414-
StripeFooter stripeFooter = new StripeFooter(allStreams, toDenseList(columnEncodings, orcTypes.size()));
417+
Optional<ZoneId> timeZone = Optional.of(hiveStorageTimeZone.toTimeZone().toZoneId());
418+
StripeFooter stripeFooter = new StripeFooter(allStreams, toDenseList(columnEncodings, orcTypes.size()), timeZone);
415419
Slice footer = metadataWriter.writeStripeFooter(stripeFooter);
416420
outputData.add(createDataOutput(footer));
417421

presto-orc/src/main/java/io/prestosql/orc/Stripe.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.prestosql.orc.metadata.ColumnEncoding;
1818
import io.prestosql.orc.stream.InputStreamSources;
1919

20+
import java.time.ZoneId;
2021
import java.util.List;
2122

2223
import static com.google.common.base.MoreObjects.toStringHelper;
@@ -25,13 +26,15 @@
2526
public class Stripe
2627
{
2728
private final long rowCount;
29+
private final ZoneId timeZone;
2830
private final List<ColumnEncoding> columnEncodings;
2931
private final List<RowGroup> rowGroups;
3032
private final InputStreamSources dictionaryStreamSources;
3133

32-
public Stripe(long rowCount, List<ColumnEncoding> columnEncodings, List<RowGroup> rowGroups, InputStreamSources dictionaryStreamSources)
34+
public Stripe(long rowCount, ZoneId timeZone, List<ColumnEncoding> columnEncodings, List<RowGroup> rowGroups, InputStreamSources dictionaryStreamSources)
3335
{
3436
this.rowCount = rowCount;
37+
this.timeZone = requireNonNull(timeZone, "timeZone is null");
3538
this.columnEncodings = requireNonNull(columnEncodings, "columnEncodings is null");
3639
this.rowGroups = ImmutableList.copyOf(requireNonNull(rowGroups, "rowGroups is null"));
3740
this.dictionaryStreamSources = requireNonNull(dictionaryStreamSources, "dictionaryStreamSources is null");
@@ -42,6 +45,11 @@ public long getRowCount()
4245
return rowCount;
4346
}
4447

48+
public ZoneId getTimeZone()
49+
{
50+
return timeZone;
51+
}
52+
4553
public List<ColumnEncoding> getColumnEncodings()
4654
{
4755
return columnEncodings;
@@ -62,6 +70,7 @@ public String toString()
6270
{
6371
return toStringHelper(this)
6472
.add("rowCount", rowCount)
73+
.add("timeZone", timeZone)
6574
.add("columnEncodings", columnEncodings)
6675
.add("rowGroups", rowGroups)
6776
.add("dictionaryStreams", dictionaryStreamSources)

presto-orc/src/main/java/io/prestosql/orc/StripeReader.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
import java.io.IOException;
4747
import java.io.InputStream;
48+
import java.time.ZoneId;
4849
import java.util.ArrayList;
4950
import java.util.HashMap;
5051
import java.util.LinkedHashSet;
@@ -75,6 +76,7 @@
7576
public class StripeReader
7677
{
7778
private final OrcDataSource orcDataSource;
79+
private final ZoneId defaultTimeZone;
7880
private final Optional<OrcDecompressor> decompressor;
7981
private final List<OrcType> types;
8082
private final HiveWriterVersion hiveWriterVersion;
@@ -85,6 +87,7 @@ public class StripeReader
8587
private final Optional<OrcWriteValidation> writeValidation;
8688

8789
public StripeReader(OrcDataSource orcDataSource,
90+
ZoneId defaultTimeZone,
8891
Optional<OrcDecompressor> decompressor,
8992
List<OrcType> types,
9093
Set<Integer> includedColumns,
@@ -95,6 +98,7 @@ public StripeReader(OrcDataSource orcDataSource,
9598
Optional<OrcWriteValidation> writeValidation)
9699
{
97100
this.orcDataSource = requireNonNull(orcDataSource, "orcDataSource is null");
101+
this.defaultTimeZone = requireNonNull(defaultTimeZone, "defaultTimeZone is null");
98102
this.decompressor = requireNonNull(decompressor, "decompressor is null");
99103
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
100104
this.includedOrcColumns = getIncludedOrcColumns(types, requireNonNull(includedColumns, "includedColumns is null"));
@@ -111,6 +115,10 @@ public Stripe readStripe(StripeInformation stripe, AggregatedMemoryContext syste
111115
// read the stripe footer
112116
StripeFooter stripeFooter = readStripeFooter(stripe, systemMemoryUsage);
113117
List<ColumnEncoding> columnEncodings = stripeFooter.getColumnEncodings();
118+
if (writeValidation.isPresent()) {
119+
writeValidation.get().validateTimeZone(orcDataSource.getId(), stripeFooter.getTimeZone().orElse(null));
120+
}
121+
ZoneId timeZone = stripeFooter.getTimeZone().orElse(defaultTimeZone);
114122

115123
// get streams for selected columns
116124
Map<StreamId, Stream> streams = new HashMap<>();
@@ -182,7 +190,7 @@ public Stripe readStripe(StripeInformation stripe, AggregatedMemoryContext syste
182190
selectedRowGroups,
183191
columnEncodings);
184192

185-
return new Stripe(stripe.getNumberOfRows(), columnEncodings, rowGroups, dictionaryStreamSources);
193+
return new Stripe(stripe.getNumberOfRows(), timeZone, columnEncodings, rowGroups, dictionaryStreamSources);
186194
}
187195
catch (InvalidCheckpointException e) {
188196
// The ORC file contains a corrupt checkpoint stream
@@ -241,7 +249,7 @@ public Stripe readStripe(StripeInformation stripe, AggregatedMemoryContext syste
241249
}
242250
RowGroup rowGroup = new RowGroup(0, 0, stripe.getNumberOfRows(), minAverageRowBytes, new InputStreamSources(builder.build()));
243251

244-
return new Stripe(stripe.getNumberOfRows(), columnEncodings, ImmutableList.of(rowGroup), dictionaryStreamSources);
252+
return new Stripe(stripe.getNumberOfRows(), timeZone, columnEncodings, ImmutableList.of(rowGroup), dictionaryStreamSources);
245253
}
246254

247255
public Map<StreamId, OrcInputStream> readDiskRanges(long stripeOffset, Map<StreamId, DiskRange> diskRanges, AggregatedMemoryContext systemMemoryUsage)

presto-orc/src/main/java/io/prestosql/orc/metadata/DwrfMetadataReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public StripeFooter readStripeFooter(List<OrcType> types, InputStream inputStrea
129129
{
130130
CodedInputStream input = CodedInputStream.newInstance(inputStream);
131131
DwrfProto.StripeFooter stripeFooter = DwrfProto.StripeFooter.parseFrom(input);
132-
return new StripeFooter(toStream(stripeFooter.getStreamsList()), toColumnEncoding(types, stripeFooter.getColumnsList()));
132+
return new StripeFooter(toStream(stripeFooter.getStreamsList()), toColumnEncoding(types, stripeFooter.getColumnsList()), Optional.empty());
133133
}
134134

135135
private static Stream toStream(DwrfProto.Stream stream)

presto-orc/src/main/java/io/prestosql/orc/metadata/ExceptionWrappingMetadataReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public StripeFooter readStripeFooter(List<OrcType> types, InputStream inputStrea
8383
try {
8484
return delegate.readStripeFooter(types, inputStream);
8585
}
86-
catch (IOException e) {
86+
catch (IOException | RuntimeException e) {
8787
throw propagate(e, "Invalid stripe footer");
8888
}
8989
}

presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,10 @@
4444
import java.util.List;
4545
import java.util.Map;
4646
import java.util.Optional;
47+
import java.util.TimeZone;
4748

4849
import static com.google.common.base.Preconditions.checkArgument;
50+
import static com.google.common.base.Strings.emptyToNull;
4951
import static com.google.common.collect.ImmutableList.toImmutableList;
5052
import static io.airlift.slice.SliceUtf8.lengthOfCodePoint;
5153
import static io.airlift.slice.SliceUtf8.tryGetCodePointAt;
@@ -159,7 +161,11 @@ public StripeFooter readStripeFooter(List<OrcType> types, InputStream inputStrea
159161
{
160162
CodedInputStream input = CodedInputStream.newInstance(inputStream);
161163
OrcProto.StripeFooter stripeFooter = OrcProto.StripeFooter.parseFrom(input);
162-
return new StripeFooter(toStream(stripeFooter.getStreamsList()), toColumnEncoding(stripeFooter.getColumnsList()));
164+
return new StripeFooter(
165+
toStream(stripeFooter.getStreamsList()),
166+
toColumnEncoding(stripeFooter.getColumnsList()),
167+
Optional.ofNullable(emptyToNull(stripeFooter.getWriterTimezone()))
168+
.map(zone -> TimeZone.getTimeZone(zone).toZoneId()));
163169
}
164170

165171
private static Stream toStream(OrcProto.Stream stream)

presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java

+5
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232

3333
import java.io.IOException;
3434
import java.io.OutputStream;
35+
import java.time.ZoneId;
3536
import java.util.List;
3637
import java.util.Map.Entry;
38+
import java.util.TimeZone;
3739

3840
import static com.google.common.base.Preconditions.checkArgument;
3941
import static java.lang.Math.toIntExact;
@@ -268,13 +270,16 @@ private static UserMetadataItem toUserMetadata(Entry<String, Slice> entry)
268270
public int writeStripeFooter(SliceOutput output, StripeFooter footer)
269271
throws IOException
270272
{
273+
ZoneId zone = footer.getTimeZone().orElseThrow(() -> new IllegalArgumentException("Time zone not set"));
274+
271275
OrcProto.StripeFooter footerProtobuf = OrcProto.StripeFooter.newBuilder()
272276
.addAllStreams(footer.getStreams().stream()
273277
.map(OrcMetadataWriter::toStream)
274278
.collect(toList()))
275279
.addAllColumns(footer.getColumnEncodings().stream()
276280
.map(OrcMetadataWriter::toColumnEncoding)
277281
.collect(toList()))
282+
.setWriterTimezone(TimeZone.getTimeZone(zone).getID())
278283
.build();
279284

280285
return writeProtobufObject(output, footerProtobuf);

presto-orc/src/main/java/io/prestosql/orc/metadata/StripeFooter.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,23 @@
1515

1616
import com.google.common.collect.ImmutableList;
1717

18+
import java.time.ZoneId;
1819
import java.util.List;
20+
import java.util.Optional;
1921

2022
import static java.util.Objects.requireNonNull;
2123

2224
public class StripeFooter
2325
{
2426
private final List<Stream> streams;
2527
private final List<ColumnEncoding> columnEncodings;
28+
private final Optional<ZoneId> timeZone;
2629

27-
public StripeFooter(List<Stream> streams, List<ColumnEncoding> columnEncodings)
30+
public StripeFooter(List<Stream> streams, List<ColumnEncoding> columnEncodings, Optional<ZoneId> timeZone)
2831
{
2932
this.streams = ImmutableList.copyOf(requireNonNull(streams, "streams is null"));
3033
this.columnEncodings = ImmutableList.copyOf(requireNonNull(columnEncodings, "columnEncodings is null"));
34+
this.timeZone = requireNonNull(timeZone, "timeZone is null");
3135
}
3236

3337
public List<ColumnEncoding> getColumnEncodings()
@@ -39,4 +43,9 @@ public List<Stream> getStreams()
3943
{
4044
return streams;
4145
}
46+
47+
public Optional<ZoneId> getTimeZone()
48+
{
49+
return timeZone;
50+
}
4251
}

presto-orc/src/main/java/io/prestosql/orc/reader/BooleanStreamReader.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import javax.annotation.Nullable;
2929

3030
import java.io.IOException;
31+
import java.time.ZoneId;
3132
import java.util.List;
3233

3334
import static com.google.common.base.MoreObjects.toStringHelper;
@@ -130,7 +131,7 @@ private void openRowGroup()
130131
}
131132

132133
@Override
133-
public void startStripe(InputStreamSources dictionaryStreamSources, List<ColumnEncoding> encoding)
134+
public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List<ColumnEncoding> encoding)
134135
{
135136
presentStreamSource = missingStreamSource(BooleanInputStream.class);
136137
dataStreamSource = missingStreamSource(BooleanInputStream.class);

0 commit comments

Comments
 (0)