@@ -689,9 +689,9 @@ impl OnDisk {
689
689
}
690
690
}
691
691
692
- /// Estimates the db size by multiplying `page_count * page_size`.
692
+ /// Estimates the db size by multiplying `used_page_count * page_size`.
693
693
async fn estimate_spool_size ( & self ) -> Result < i64 , BufferError > {
694
- let size: i64 = sql:: current_size ( )
694
+ let size: i64 = sql:: estimate_size ( )
695
695
. fetch_one ( & self . db )
696
696
. await
697
697
. and_then ( |r| r. try_get ( 0 ) )
@@ -1268,7 +1268,7 @@ impl Drop for BufferService {
1268
1268
mod tests {
1269
1269
use std:: str:: FromStr ;
1270
1270
use std:: sync:: Mutex ;
1271
- use std:: time:: Duration ;
1271
+ use std:: time:: { Duration , Instant } ;
1272
1272
1273
1273
use insta:: assert_debug_snapshot;
1274
1274
use rand:: Rng ;
@@ -1496,7 +1496,7 @@ mod tests {
1496
1496
"envelopes" : {
1497
1497
"path" : std:: env:: temp_dir( ) . join( Uuid :: new_v4( ) . to_string( ) ) ,
1498
1498
"max_memory_size" : "4KB" ,
1499
- "max_disk_size" : "20KB " ,
1499
+ "max_disk_size" : "40KB " ,
1500
1500
}
1501
1501
}
1502
1502
} ) )
@@ -1557,7 +1557,7 @@ mod tests {
1557
1557
. filter ( |name| name. contains ( "buffer." ) )
1558
1558
. collect ( ) ;
1559
1559
1560
- assert_debug_snapshot ! ( captures, @r#"
1560
+ assert_debug_snapshot ! ( captures, @r### "
1561
1561
[
1562
1562
"buffer.envelopes_mem:2000|h",
1563
1563
"buffer.envelopes_mem_count:1|g",
@@ -1569,26 +1569,25 @@ mod tests {
1569
1569
"buffer.writes:1|c",
1570
1570
"buffer.envelopes_written:3|c",
1571
1571
"buffer.envelopes_disk_count:3|g",
1572
- "buffer.disk_size:1031 |h",
1572
+ "buffer.disk_size:24576 |h",
1573
1573
"buffer.envelopes_written:1|c",
1574
1574
"buffer.envelopes_disk_count:4|g",
1575
1575
"buffer.writes:1|c",
1576
- "buffer.disk_size:1372 |h",
1577
- "buffer.disk_size:1372 |h",
1576
+ "buffer.disk_size:24576 |h",
1577
+ "buffer.disk_size:24576 |h",
1578
1578
"buffer.envelopes_written:1|c",
1579
1579
"buffer.envelopes_disk_count:5|g",
1580
1580
"buffer.writes:1|c",
1581
- "buffer.disk_size:1713 |h",
1581
+ "buffer.disk_size:24576 |h",
1582
1582
"buffer.dequeue_attempts:1|h",
1583
1583
"buffer.reads:1|c",
1584
1584
"buffer.envelopes_read:-5|c",
1585
1585
"buffer.envelopes_disk_count:0|g",
1586
1586
"buffer.dequeue_attempts:1|h",
1587
1587
"buffer.reads:1|c",
1588
- "buffer.disk_size:8|h",
1589
- "buffer.reads:1|c",
1588
+ "buffer.disk_size:24576|h",
1590
1589
]
1591
- "# ) ;
1590
+ "### ) ;
1592
1591
}
1593
1592
1594
1593
pub enum TestHealth {
@@ -1879,4 +1878,79 @@ mod tests {
1879
1878
let index = index. lock ( ) . unwrap ( ) . clone ( ) ;
1880
1879
assert_eq ! ( index. len( ) , 100 ) ;
1881
1880
}
1881
+
1882
+ #[ ignore] // Slow. Should probably be a criterion benchmark.
1883
+ #[ tokio:: test]
1884
+ async fn compare_counts ( ) {
1885
+ let path = std:: env:: temp_dir ( ) . join ( Uuid :: new_v4 ( ) . to_string ( ) ) ;
1886
+ let options = SqliteConnectOptions :: new ( )
1887
+ . filename ( path)
1888
+ . journal_mode ( SqliteJournalMode :: Wal )
1889
+ . create_if_missing ( true ) ;
1890
+
1891
+ let db = sqlx:: SqlitePool :: connect_with ( options) . await . unwrap ( ) ;
1892
+
1893
+ println ! ( "Migrating..." ) ;
1894
+ sqlx:: migrate!( "../migrations" ) . run ( & db) . await . unwrap ( ) ;
1895
+
1896
+ let transaction = db. begin ( ) . await . unwrap ( ) ;
1897
+
1898
+ // println!("Inserting...");
1899
+ let mut query_builder = sqlx:: QueryBuilder :: new (
1900
+ "INSERT INTO envelopes (received_at, own_key, sampling_key, envelope) " ,
1901
+ ) ;
1902
+ let n = 10000 ;
1903
+ for i in 0 ..n {
1904
+ if ( i % 100 ) == 0 {
1905
+ println ! ( "Batch {i} of {n}" ) ;
1906
+ }
1907
+
1908
+ let chunk = ( 0 ..5000 ) . map ( |_| ( "" , "" , "this is my chunk how do you like it" , "" ) ) ;
1909
+ let query = query_builder
1910
+ . push_values ( chunk, |mut b, ( key1, key2, value, received_at) | {
1911
+ b. push_bind ( received_at)
1912
+ . push_bind ( key1)
1913
+ . push_bind ( key2)
1914
+ . push_bind ( value) ;
1915
+ } )
1916
+ . build ( ) ;
1917
+
1918
+ query. execute ( & db) . await . unwrap ( ) ;
1919
+
1920
+ query_builder. reset ( ) ;
1921
+ }
1922
+ transaction. commit ( ) . await . unwrap ( ) ;
1923
+
1924
+ let t = Instant :: now ( ) ;
1925
+ let q = sqlx:: query (
1926
+ r#"SELECT SUM(pgsize - unused) FROM dbstat WHERE name="envelopes" AND aggregate = FALSE"# ,
1927
+ ) ;
1928
+ let pgsize: i64 = q. fetch_one ( & db) . await . unwrap ( ) . get ( 0 ) ;
1929
+ println ! ( "pgsize: {} {:?}" , pgsize, t. elapsed( ) ) ;
1930
+
1931
+ let t = Instant :: now ( ) ;
1932
+ let q = sqlx:: query (
1933
+ r#"SELECT SUM(pgsize - unused) FROM dbstat WHERE name="envelopes" AND aggregate = TRUE"# ,
1934
+ ) ;
1935
+ let pgsize_agg: i64 = q. fetch_one ( & db) . await . unwrap ( ) . get ( 0 ) ;
1936
+ println ! ( "pgsize_agg: {} {:?}" , pgsize_agg, t. elapsed( ) ) ;
1937
+
1938
+ let t = Instant :: now ( ) ;
1939
+ let q = sqlx:: query ( r#"SELECT SUM(length(envelope)) FROM envelopes"# ) ;
1940
+ let brute_force: i64 = q. fetch_one ( & db) . await . unwrap ( ) . get ( 0 ) ;
1941
+ println ! ( "brute_force: {} {:?}" , brute_force, t. elapsed( ) ) ;
1942
+
1943
+ let t = Instant :: now ( ) ;
1944
+ let q = sqlx:: query (
1945
+ r#"SELECT (page_count - freelist_count) * page_size as size FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size();"# ,
1946
+ ) ;
1947
+ let pragma: i64 = q. fetch_one ( & db) . await . unwrap ( ) . get ( 0 ) ;
1948
+ println ! ( "pragma: {} {:?}" , pragma, t. elapsed( ) ) ;
1949
+
1950
+ // Result:
1951
+ // pgsize = 2'408'464'463 t.elapsed() = 7.007533833s
1952
+ // pgsize_agg = 2'408'464'463 t.elapsed() = 5.010104791s
1953
+ // brute_force = 1'750'000'000 t.elapsed() = 7.893590875s
1954
+ // pragma = 3'036'307'456 t.elapsed() = 213.417µs
1955
+ }
1882
1956
}
0 commit comments