[Kernel][Parquet Writer] Fix an issue with writing decimal as binary (#…
## Description
The number of bytes needed to calculate the max buffer size needed when
writing the decimal type to Parquet is off by one.

Resolved #3152

## How was this patch tested?
Added unit tests that read and write decimals with various precision and
vkorukanti authored Jun 7, 2024
{"commitInfo":{"timestamp":1717778521300,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"3","numOutputBytes":"9126"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.3.0-SNAPSHOT","txnId":"5e3bfa16-cf0f-4d40-ad7d-b6426a6b4b7a"}}
Expand Up @@ -17,12 +17,13 @@

import java.math.{BigDecimal => JBigDecimal}
import java.math.{BigInteger, BigDecimal => JBigDecimal}
import java.sql.Timestamp
import java.time.ZoneOffset.UTC
import java.time.LocalDateTime
import java.util.{Locale, TimeZone}
import java.util.{Locale, Random, TimeZone}

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.implicitConversions

Expand Down Expand Up @@ -1234,6 +1235,53 @@ class GoldenTables extends QueryTest with SharedSparkSession {

generateGoldenTable("decimal-various-scale-precision") { tablePath =>
val fields = ArrayBuffer[StructField]()
Seq(0, 4, 7, 12, 15, 18, 25, 35, 38).foreach { precision =>
Seq.range(start = 0, end = precision, step = 6).foreach { scale =>
StructField(s"decimal_${precision}_${scale}", DecimalType(precision, scale)))

val schema = StructType(fields)

val random = new Random(27 /* seed */)
def generateRandomBigDecimal(precision: Int, scale: Int): JBigDecimal = {
// Generate a random BigInteger with the specified precision
val unscaledValue = new BigInteger(precision, random)

// Create a BigDecimal with the unscaled value and the specified scale
new JBigDecimal(unscaledValue, scale)

val rows = ArrayBuffer[Row]()
Seq.range(start = 0, end = 3).foreach { i =>
val rowValues = ArrayBuffer[BigDecimal]()
Seq(0, 4, 7, 12, 15, 18, 25, 35, 38).foreach { precision =>
Seq.range(start = 0, end = precision, step = 3).foreach { scale =>
i match {
case 0 =>
case 1 =>
// Generate a positive random BigDecimal with the specified precision and scale
rowValues.append(generateRandomBigDecimal(precision, scale))
case 2 =>
// Generate a negative random BigDecimal with the specified precision and scale
rowValues.append(generateRandomBigDecimal(precision, scale).negate())
rows.append(Row(rowValues: _*))

spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)

for (parquetFormat <- Seq("v1", "v2")) {
// PARQUET_1_0 doesn't support dictionary encoding for FIXED_LEN_BYTE_ARRAY (only PARQUET_2_0)
generateGoldenTable(s"parquet-decimal-dictionaries-$parquetFormat") { tablePath =>
Expand Up @@ -20,8 +20,10 @@
import static java.lang.String.format;

import org.apache.parquet.schema.*;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.Type.Repetition;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.decimalType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
Expand Down Expand Up @@ -54,7 +56,7 @@ class ParquetSchemaUtils {

static {
List<Integer> maxBytesPerPrecision = new ArrayList<>();
for (int i = 1; i <= 38; i++) {
for (int i = 0; i <= 38; i++) {
int numBytes = 1;
while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, i)) {
numBytes += 1;
Expand Down Expand Up @@ -205,18 +207,21 @@ private static Type toParquetType(
DecimalType decimalType = (DecimalType) dataType;
int precision = decimalType.getPrecision();
int scale = decimalType.getScale();
// DecimalType constructor already has checks to make sure the precision and scale are
// within the valid range. No need to check them again.

DecimalLogicalTypeAnnotation decimalAnnotation = decimalType(scale, precision);
if (precision <= DECIMAL_MAX_DIGITS_IN_INT) {
type = primitive(INT32, repetition)
.as(LogicalTypeAnnotation.decimalType(scale, precision))
} else if (precision <= DECIMAL_MAX_DIGITS_IN_LONG) {
type = primitive(INT64, repetition)
.as(LogicalTypeAnnotation.decimalType(scale, precision))
} else {
type = primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.as(LogicalTypeAnnotation.decimalType(scale, precision))
Expand Up @@ -184,6 +184,18 @@ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils {

test(s"end to end: reading decimal-various-scale-precision") {
val tablePath = goldenTablePath("decimal-various-scale-precision")
val expResults = spark.sql(s"SELECT * FROM delta.`$tablePath`")

path = goldenTablePath("decimal-various-scale-precision"),
expectedAnswer = expResults

// Table/Snapshot tests
Expand Up @@ -153,6 +153,21 @@ class ParquetFileWriterSuite extends AnyFunSuite
4 // how many columns have the stats collected from given list above
// Decimal types with various precision and scales
Seq((10000, 1)).map {
case (targetFileSize, expParquetFileCount) =>
"write decimal various scales and precision (with stats)", // test name
3, /* expected number of rows written to Parquet files */
Option.empty[Predicate], // predicate for filtering what rows to write to parquet files
Seq.empty, tableSchema(goldenTablePath("decimal-various-scale-precision"))),
29 // how many columns have the stats collected from given list above
).flatten.foreach {
case (name, input, fileSize, expFileCount, expRowCount, predicate, statsCols, expStatsColCnt) =>
