Skip to content

Commit

Permalink
Return read/written rows whenever possible
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Jul 28, 2022
1 parent cdaf8f3 commit 537663b
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
public class ClickHouseResponseSummary implements Serializable {
private static final long serialVersionUID = 6241261266635143197L;

static final String ERROR_CANNOT_UPDATE = "Sealed summary cannot be updated";

public static final ClickHouseResponseSummary EMPTY = new ClickHouseResponseSummary(null, null, true);

/**
Expand Down Expand Up @@ -61,6 +63,11 @@ public long getWrittenRows() {
public long getWrittenBytes() {
return written_bytes;
}

public boolean isEmpty() {
return read_rows == 0L && read_bytes == 0L && total_rows_to_read == 0L && written_rows == 0L
&& written_bytes == 0L;
}
}

/**
Expand Down Expand Up @@ -112,6 +119,10 @@ public boolean hasAppliedLimit() {
public long getRowsBeforeLimit() {
return rows_before_limit;
}

public boolean isEmpty() {
return rows == 0L && blocks == 0L && allocated_bytes == 0L && !applied_limit && rows_before_limit == 0L;
}
}

private final AtomicReference<Progress> progress;
Expand Down Expand Up @@ -139,9 +150,15 @@ public ClickHouseResponseSummary(Progress progress, Statistics stats) {
* @param sealed whether the summary is sealed
*/
protected ClickHouseResponseSummary(Progress progress, Statistics stats, boolean sealed) {
this.progress = new AtomicReference<>(progress != null ? progress : new Progress(0L, 0L, 0L, 0L, 0L));
this.stats = new AtomicReference<>(stats != null ? stats : new Statistics(0L, 0L, 0L, false, 0L));
this.updates = new AtomicInteger(1);
if (progress == null) {
progress = new Progress(0L, 0L, 0L, 0L, 0L);
}
if (stats == null) {
stats = new Statistics(0L, 0L, 0L, false, 0L);
}
this.progress = new AtomicReference<>(progress);
this.stats = new AtomicReference<>(stats);
this.updates = new AtomicInteger(progress.isEmpty() && stats.isEmpty() ? 0 : 1);

this.sealed = sealed;
}
Expand All @@ -159,6 +176,10 @@ public void seal() {
* @return increased update counter
*/
public int update() {
if (sealed) {
throw new IllegalStateException(ERROR_CANNOT_UPDATE);
}

return this.updates.incrementAndGet();
}

Expand All @@ -169,7 +190,7 @@ public int update() {
*/
public void update(Progress progress) {
if (sealed) {
throw new IllegalStateException("Sealed summary cannot be updated");
throw new IllegalStateException(ERROR_CANNOT_UPDATE);
}

if (progress != null) {
Expand All @@ -179,7 +200,7 @@ public void update(Progress progress) {

public void update(Statistics stats) {
if (sealed) {
throw new IllegalStateException("Sealed summary cannot be updated");
throw new IllegalStateException(ERROR_CANNOT_UPDATE);
}

if (stats != null) {
Expand Down Expand Up @@ -228,4 +249,8 @@ public long getWrittenBytes() {
public int getUpdateCount() {
return updates.get();
}

public boolean isEmpty() {
return progress.get().isEmpty() && stats.get().isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,13 @@ private ClickHouseResponse getLastResponse(Map<ClickHouseOption, Serializable> o
} catch (Exception e) {
throw SqlExceptionUtils.handle(e);
} finally {
if (i + 1 < len && response != null) {
if (response == null) {
// something went wrong
} else if (i + 1 < len) {
response.close();
response = null;
} else {
updateResult(stmt, response);
}
}
}
Expand Down Expand Up @@ -166,18 +171,17 @@ protected ClickHouseResponse executeStatement(ClickHouseSqlStatement stmt,

protected int executeInsert(String sql, InputStream input) throws SQLException {
boolean autoTx = connection.getAutoCommit() && connection.isTransactionSupported();
ClickHouseResponseSummary summary = null;
Mutation req = request.write().query(sql, queryId = connection.newQueryId()).data(input);
try (ClickHouseResponse resp = autoTx
? req.executeWithinTransaction(connection.isImplicitTransactionSupported())
: req.transaction(connection.getTransaction()).sendAndWait();
ResultSet rs = updateResult(new ClickHouseSqlStatement(sql, StatementType.INSERT), resp)) {
summary = resp.getSummary();
// ignore
} catch (Exception e) {
throw SqlExceptionUtils.handle(e);
}

return summary != null && summary.getWrittenRows() > 0L ? (int) summary.getWrittenRows() : 1;
return (int) currentUpdateCount;
}

protected ClickHouseSqlStatement getLastStatement() {
Expand Down Expand Up @@ -212,23 +216,17 @@ protected ClickHouseResultSet newEmptyResultSet() throws SQLException {
}

protected ResultSet updateResult(ClickHouseSqlStatement stmt, ClickHouseResponse response) throws SQLException {
ResultSet rs = null;
if (stmt.isQuery() || !response.getColumns().isEmpty()) {
currentUpdateCount = -1L;
currentResult = new ClickHouseResultSet(stmt.getDatabaseOrDefault(getConnection().getCurrentDatabase()),
stmt.getTable(), this, response);
rs = currentResult;
} else {
currentUpdateCount = response.getSummary().getWrittenRows();
// FIXME apparently this is not always true
if (currentUpdateCount <= 0L) {
currentUpdateCount = 1L;
}
currentResult = null;
response.close();
currentUpdateCount = stmt.isDDL() ? 0L
: (response.getSummary().isEmpty() ? 1L : response.getSummary().getWrittenRows());
currentResult = null;
}

return rs == null ? newEmptyResultSet() : rs;
return currentResult;
}

protected ClickHouseStatementImpl(ClickHouseConnectionImpl connection, ClickHouseRequest<?> request,
Expand Down Expand Up @@ -303,18 +301,8 @@ public ResultSet executeQuery(String sql) throws SQLException {
}

parseSqlStatements(sql);

ClickHouseResponse response = getLastResponse(null, null, null);

try {
return updateResult(getLastStatement(), response);
} catch (Exception e) {
if (response != null) {
response.close();
}

throw SqlExceptionUtils.handle(e);
}
getLastResponse(null, null, null);
return currentResult != null ? currentResult : newEmptyResultSet();
}

@Override
Expand All @@ -326,14 +314,11 @@ public long executeLargeUpdate(String sql) throws SQLException {

parseSqlStatements(sql);

ClickHouseResponseSummary summary = null;
try (ClickHouseResponse response = getLastResponse(null, null, null)) {
summary = response.getSummary();
return currentUpdateCount;
} catch (Exception e) {
throw SqlExceptionUtils.handle(e);
}

return summary != null ? summary.getWrittenRows() : 1L;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,7 @@ public void testBatchQuery() throws SQLException {
@Test(dataProvider = "statementAndParams", groups = "integration")
public void testExecuteWithOrWithoutParameters(String tableSuffix, String query, Class<?> clazz,
boolean hasResultSet, String[] params, boolean checkTable) throws SQLException {
int expectedRowCount = "ddl".equals(tableSuffix) ? 0 : 1;
String tableName = "test_execute_ps_" + tableSuffix;
query = query.replace("$table", tableName);
Properties props = new Properties();
Expand Down Expand Up @@ -932,7 +933,7 @@ public void testExecuteWithOrWithoutParameters(String tableSuffix, String query,
if (hasResultSet) {
Assert.assertThrows(SQLException.class, () -> ps.executeLargeBatch());
} else {
Assert.assertEquals(ps.executeLargeBatch(), new long[] { 1L });
Assert.assertEquals(ps.executeLargeBatch(), new long[] { expectedRowCount });
}
if (checkTable)
checkTable(stmt, "select * from " + tableName, params);
Expand All @@ -950,7 +951,7 @@ public void testExecuteWithOrWithoutParameters(String tableSuffix, String query,
if (hasResultSet) {
Assert.assertThrows(SQLException.class, () -> ps.executeBatch());
} else {
Assert.assertEquals(ps.executeBatch(), new int[] { 1 });
Assert.assertEquals(ps.executeBatch(), new int[] { expectedRowCount });
}
if (checkTable)
checkTable(stmt, "select * from " + tableName, params);
Expand All @@ -973,7 +974,7 @@ public void testExecuteWithOrWithoutParameters(String tableSuffix, String query,
if (hasResultSet) {
Assert.assertEquals(ps.executeLargeBatch(), new long[] { Statement.EXECUTE_FAILED });
} else {
Assert.assertEquals(ps.executeLargeBatch(), new long[] { 1L });
Assert.assertEquals(ps.executeLargeBatch(), new long[] { expectedRowCount });
}
if (checkTable)
checkTable(stmt, "select * from " + tableName, params);
Expand All @@ -988,7 +989,7 @@ public void testExecuteWithOrWithoutParameters(String tableSuffix, String query,
if (hasResultSet) {
Assert.assertEquals(ps.executeBatch(), new int[] { Statement.EXECUTE_FAILED });
} else {
Assert.assertEquals(ps.executeBatch(), new int[] { 1 });
Assert.assertEquals(ps.executeBatch(), new int[] { expectedRowCount });
}
if (checkTable)
checkTable(stmt, "select * from " + tableName, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void testMutation() throws SQLException {
// [update] tbl a [set] a.b = 1 where a.b != 1[ settings mutation_async=0]
// alter table tbl a update a.b = 1 where a.b != 1
conn.setClientInfo("ApplicationName", "333");
Assert.assertEquals(conn.createStatement().executeUpdate("update test_mutation set b = 22 where b = 1"), 0);
Assert.assertEquals(conn.createStatement().executeUpdate("update test_mutation set b = 22 where b = 1"), 1);

Assert.assertThrows(SQLException.class,
() -> stmt.executeUpdate("update non_existing_table set value=1 where key=1"));
Expand Down Expand Up @@ -407,7 +407,7 @@ public void testExecuteQuery() throws SQLException {
rs = stmt.executeQuery("drop table if exists non_existing_table");
Assert.assertNotNull(rs, "Should never be null");
Assert.assertNull(stmt.getResultSet(), "Should be null");
Assert.assertEquals(stmt.getUpdateCount(), 1);
Assert.assertEquals(stmt.getUpdateCount(), 0);
Assert.assertFalse(rs.next(), "Should has no row");
}
}
Expand Down Expand Up @@ -519,7 +519,7 @@ public void testQuerySystemLog() throws SQLException {
stmt.addBatch("drop table if exists non_existing_table2");
stmt.addBatch("drop table if exists non_existing_table3");
int[] results = stmt.executeBatch();
Assert.assertEquals(results, new int[] { 1, 1, 1 });
Assert.assertEquals(results, new int[] { 0, 0, 0 });
}
}

Expand Down

0 comments on commit 537663b

Please sign in to comment.