diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java index ca836b8793..95fc0c8dd3 100644 --- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java +++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java @@ -193,6 +193,10 @@ public void write(NullWritable key, Operation operation) } catch (Exception e) { throw new IOException("Encountered an error while writing", e); } + + if (session.countPendingErrors() > 0) { + throw new IOException(session.getPendingErrors().getRowErrors()[0].toString()); + } } @Override diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java index 2719ebf9ae..35c2f342a6 100644 --- a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java +++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java @@ -19,13 +19,18 @@ import static org.apache.kudu.test.ClientTestUtil.countRowsInScan; import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions; import static org.apache.kudu.test.ClientTestUtil.getBasicSchema; +import static org.apache.kudu.test.ClientTestUtil.getBasicTableOptionsWithNonCoveredRange; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.kudu.test.KuduTestHarness; +import org.apache.kudu.test.KuduTestHarness.MasterServerConfig; +import org.apache.kudu.test.KuduTestHarness.TabletServerConfig; import org.junit.Rule; import org.junit.Test; @@ -44,6 +49,8 @@ public class ITKuduTableOutputFormat { public KuduTestHarness harness = new KuduTestHarness(); @Test + @MasterServerConfig(flags = { "--rpc_max_message_size=3860733440" }) + @TabletServerConfig(flags = { "--rpc_max_message_size=3860733440" }) public void test() throws Exception { harness.getClient().createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions()); @@ -71,4 +78,42 @@ public void test() throws Exception { AsyncKuduScanner.AsyncKuduScannerBuilder builder = harness.getAsyncClient().newScannerBuilder(table); assertEquals(1, countRowsInScan(builder.build())); } + + @Test + @MasterServerConfig(flags = { "--rpc_max_message_size=3860733440" }) + @TabletServerConfig(flags = { "--rpc_max_message_size=3860733440" }) + public void testWriteException() throws Exception { + harness.getClient().createTable(TABLE_NAME, getBasicSchema(), getBasicTableOptionsWithNonCoveredRange()); + + KuduTableOutputFormat output = new KuduTableOutputFormat(); + Configuration conf = new Configuration(); + conf.set(KuduTableOutputFormat.MASTER_ADDRESSES_KEY, harness.getMasterAddressesAsString()); + conf.set(KuduTableOutputFormat.OUTPUT_TABLE_KEY, TABLE_NAME); + output.setConf(conf); + + String multitonKey = conf.get(KuduTableOutputFormat.MULTITON_KEY); + KuduTable table = KuduTableOutputFormat.getKuduTable(multitonKey); + assertNotNull(table); + + + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addInt(0, 201); // outside of range partition + row.addInt(1, 2); + row.addInt(2, 3); + row.addString(3, "a string"); + row.addBoolean(4, true); + + boolean threwException = false; + RecordWriter rw = output.getRecordWriter(null); + try { + rw.write(NullWritable.get(), insert); + } catch (IOException e) { + if (e.getMessage().contains("Row error for primary key=")) { + threwException = true; + } + } + assertTrue("writing a PK outside of range partition is supposed to throw an Exception", threwException); + rw.close(null); + } }