@@ -19,36 +19,35 @@ package org.apache.kyuubi.spark.connector.hive
19
19
20
20
import java .lang .{Boolean => JBoolean , Long => JLong }
21
21
22
+ import scala .util .Try
23
+
22
24
import org .apache .hadoop .fs .{FileStatus , Path }
23
25
import org .apache .hadoop .hive .ql .plan .{FileSinkDesc , TableDesc }
24
- import org .apache .spark .SPARK_VERSION
25
26
import org .apache .spark .internal .Logging
26
27
import org .apache .spark .sql .SparkSession
27
28
import org .apache .spark .sql .catalyst .InternalRow
28
29
import org .apache .spark .sql .catalyst .catalog .{CatalogTable , CatalogTablePartition }
29
30
import org .apache .spark .sql .connector .catalog .TableChange
30
- import org .apache .spark .sql .connector .catalog .TableChange .{ AddColumn , After , ColumnPosition , DeleteColumn , First , RenameColumn , UpdateColumnComment , UpdateColumnNullability , UpdateColumnPosition , UpdateColumnType }
31
+ import org .apache .spark .sql .connector .catalog .TableChange ._
31
32
import org .apache .spark .sql .execution .command .CommandUtils
32
33
import org .apache .spark .sql .execution .command .CommandUtils .{calculateMultipleLocationSizes , calculateSingleLocationSize }
33
34
import org .apache .spark .sql .execution .datasources .{PartitionDirectory , PartitionedFile }
34
35
import org .apache .spark .sql .hive .execution .HiveFileFormat
35
36
import org .apache .spark .sql .internal .SQLConf
36
37
import org .apache .spark .sql .types .{ArrayType , MapType , StructField , StructType }
37
38
38
- import org .apache .kyuubi .spark .connector .common .SparkUtils .SPARK_RUNTIME_VERSION
39
39
import org .apache .kyuubi .util .reflect .{DynClasses , DynConstructors , DynMethods }
40
40
import org .apache .kyuubi .util .reflect .ReflectUtils .invokeAs
41
41
42
42
object HiveConnectorUtils extends Logging {
43
43
44
- // SPARK-43186
45
- def getHiveFileFormat (fileSinkConf : FileSinkDesc ): HiveFileFormat = {
46
- if (SPARK_RUNTIME_VERSION >= " 3.5" ) {
44
+ def getHiveFileFormat (fileSinkConf : FileSinkDesc ): HiveFileFormat =
45
+ Try { // SPARK-43186: 3.5.0
47
46
DynConstructors .builder()
48
47
.impl(classOf [HiveFileFormat ], classOf [FileSinkDesc ])
49
48
.build[HiveFileFormat ]()
50
49
.newInstance(fileSinkConf)
51
- } else if ( SPARK_RUNTIME_VERSION >= " 3.3 " ) {
50
+ }.recover { case _ : Exception =>
52
51
val shimFileSinkDescClz = DynClasses .builder()
53
52
.impl(" org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc" )
54
53
.build()
@@ -67,34 +66,26 @@ object HiveConnectorUtils extends Logging {
67
66
.impl(classOf [HiveFileFormat ], shimFileSinkDescClz)
68
67
.build[HiveFileFormat ]()
69
68
.newInstance(shimFileSinkDesc)
70
- } else {
71
- throw unsupportedSparkVersion()
72
- }
73
- }
69
+ }.get
74
70
75
- // SPARK-41970
76
- def partitionedFilePath (file : PartitionedFile ): String = {
77
- if (SPARK_RUNTIME_VERSION >= " 3.4" ) {
71
+ def partitionedFilePath (file : PartitionedFile ): String =
72
+ Try { // SPARK-41970: 3.4.0
78
73
invokeAs[String ](file, " urlEncodedPath" )
79
- } else if ( SPARK_RUNTIME_VERSION >= " 3.3 " ) {
74
+ }.recover { case _ : Exception =>
80
75
invokeAs[String ](file, " filePath" )
81
- } else {
82
- throw unsupportedSparkVersion()
83
- }
84
- }
76
+ }.get
85
77
86
78
def splitFiles (
87
79
sparkSession : SparkSession ,
88
80
file : AnyRef ,
89
81
filePath : Path ,
90
- isSplitable : Boolean ,
91
- maxSplitBytes : Long ,
92
- partitionValues : InternalRow ): Seq [PartitionedFile ] = {
93
-
94
- if (SPARK_RUNTIME_VERSION >= " 4.0" ) { // SPARK-42821
82
+ isSplitable : JBoolean ,
83
+ maxSplitBytes : JLong ,
84
+ partitionValues : InternalRow ): Seq [PartitionedFile ] =
85
+ Try { // SPARK-42821: 4.0.0-preview2
95
86
val fileStatusWithMetadataClz = DynClasses .builder()
96
87
.impl(" org.apache.spark.sql.execution.datasources.FileStatusWithMetadata" )
97
- .build ()
88
+ .buildChecked ()
98
89
DynMethods
99
90
.builder(" splitFiles" )
100
91
.impl(
@@ -103,35 +94,58 @@ object HiveConnectorUtils extends Logging {
103
94
classOf [Boolean ],
104
95
classOf [Long ],
105
96
classOf [InternalRow ])
106
- .build ()
107
- .invoke [Seq [PartitionedFile ]](
97
+ .buildChecked ()
98
+ .invokeChecked [Seq [PartitionedFile ]](
108
99
null ,
109
100
file,
110
- isSplitable. asInstanceOf [ JBoolean ] ,
111
- maxSplitBytes. asInstanceOf [ JLong ] ,
101
+ isSplitable,
102
+ maxSplitBytes,
112
103
partitionValues)
113
- } else if ( SPARK_RUNTIME_VERSION >= " 3.5 " ) { // SPARK-43039
104
+ }.recover { case _ : Exception => // SPARK-51185: Spark 3.5.5
114
105
val fileStatusWithMetadataClz = DynClasses .builder()
115
106
.impl(" org.apache.spark.sql.execution.datasources.FileStatusWithMetadata" )
116
- .build ()
107
+ .buildChecked ()
117
108
DynMethods
118
109
.builder(" splitFiles" )
119
110
.impl(
120
111
" org.apache.spark.sql.execution.PartitionedFileUtil" ,
121
112
classOf [SparkSession ],
122
113
fileStatusWithMetadataClz,
114
+ classOf [Path ],
123
115
classOf [Boolean ],
124
116
classOf [Long ],
125
117
classOf [InternalRow ])
126
- .build()
127
- .invoke[Seq [PartitionedFile ]](
118
+ .buildChecked()
119
+ .invokeChecked[Seq [PartitionedFile ]](
120
+ null ,
121
+ sparkSession,
122
+ file,
123
+ filePath,
124
+ isSplitable,
125
+ maxSplitBytes,
126
+ partitionValues)
127
+ }.recover { case _ : Exception => // SPARK-43039: 3.5.0
128
+ val fileStatusWithMetadataClz = DynClasses .builder()
129
+ .impl(" org.apache.spark.sql.execution.datasources.FileStatusWithMetadata" )
130
+ .buildChecked()
131
+ DynMethods
132
+ .builder(" splitFiles" )
133
+ .impl(
134
+ " org.apache.spark.sql.execution.PartitionedFileUtil" ,
135
+ classOf [SparkSession ],
136
+ fileStatusWithMetadataClz,
137
+ classOf [Boolean ],
138
+ classOf [Long ],
139
+ classOf [InternalRow ])
140
+ .buildChecked()
141
+ .invokeChecked[Seq [PartitionedFile ]](
128
142
null ,
129
143
sparkSession,
130
144
file,
131
- isSplitable. asInstanceOf [ JBoolean ] ,
132
- maxSplitBytes. asInstanceOf [ JLong ] ,
145
+ isSplitable,
146
+ maxSplitBytes,
133
147
partitionValues)
134
- } else if ( SPARK_RUNTIME_VERSION >= " 3.3 " ) {
148
+ }.recover { case _ : Exception =>
135
149
DynMethods
136
150
.builder(" splitFiles" )
137
151
.impl(
@@ -142,55 +156,41 @@ object HiveConnectorUtils extends Logging {
142
156
classOf [Boolean ],
143
157
classOf [Long ],
144
158
classOf [InternalRow ])
145
- .build ()
146
- .invoke [Seq [PartitionedFile ]](
159
+ .buildChecked ()
160
+ .invokeChecked [Seq [PartitionedFile ]](
147
161
null ,
148
162
sparkSession,
149
163
file,
150
164
filePath,
151
- isSplitable. asInstanceOf [ JBoolean ] ,
152
- maxSplitBytes. asInstanceOf [ JLong ] ,
165
+ isSplitable,
166
+ maxSplitBytes,
153
167
partitionValues)
154
- } else {
155
- throw unsupportedSparkVersion()
156
- }
157
- }
168
+ }.get
158
169
159
- def createPartitionDirectory (values : InternalRow , files : Seq [FileStatus ]): PartitionDirectory = {
160
- if ( SPARK_RUNTIME_VERSION >= " 3.5" ) {
170
+ def createPartitionDirectory (values : InternalRow , files : Seq [FileStatus ]): PartitionDirectory =
171
+ Try { // SPARK-43039: 3.5.0
161
172
new DynMethods .Builder (" apply" )
162
173
.impl(classOf [PartitionDirectory ], classOf [InternalRow ], classOf [Array [FileStatus ]])
163
174
.buildChecked()
164
175
.asStatic()
165
176
.invoke[PartitionDirectory ](values, files.toArray)
166
- } else if ( SPARK_RUNTIME_VERSION >= " 3.3 " ) {
177
+ }.recover { case _ : Exception =>
167
178
new DynMethods .Builder (" apply" )
168
179
.impl(classOf [PartitionDirectory ], classOf [InternalRow ], classOf [Seq [FileStatus ]])
169
180
.buildChecked()
170
181
.asStatic()
171
182
.invoke[PartitionDirectory ](values, files)
172
- } else {
173
- throw unsupportedSparkVersion()
174
- }
175
- }
183
+ }.get
176
184
177
- def getPartitionFilePath (file : AnyRef ): Path = {
178
- if ( SPARK_RUNTIME_VERSION >= " 3.5" ) {
185
+ def getPartitionFilePath (file : AnyRef ): Path =
186
+ Try { // SPARK-43039: 3.5.0
179
187
new DynMethods .Builder (" getPath" )
180
188
.impl(" org.apache.spark.sql.execution.datasources.FileStatusWithMetadata" )
181
189
.build()
182
190
.invoke[Path ](file)
183
- } else if ( SPARK_RUNTIME_VERSION >= " 3.3 " ) {
191
+ }.recover { case _ : Exception =>
184
192
file.asInstanceOf [FileStatus ].getPath
185
- } else {
186
- throw unsupportedSparkVersion()
187
- }
188
- }
189
-
190
- private def unsupportedSparkVersion (): KyuubiHiveConnectorException = {
191
- KyuubiHiveConnectorException (s " Spark version $SPARK_VERSION " +
192
- " is not supported by Kyuubi spark hive connector." )
193
- }
193
+ }.get
194
194
195
195
def calculateTotalSize (
196
196
spark : SparkSession ,
0 commit comments