Skip to content

Commit

Permalink
fix(server): correct getting messages by timestamp from unsaved_buffer (
Browse files Browse the repository at this point in the history
#1570)

This commit fixed functionality to retrieve messages by timestamp
when all messages are stored in the unsaved buffer. The changes include
modifications to the `BatchAccumulator` and `Segment` structs to handle
timestamp-based retrieval efficiently. The `get_messages_by_timestamp`
method has been added to both strcuts, allowing for precise message
retrieval based on timestamps.
  • Loading branch information
hubcio authored Feb 21, 2025
1 parent 045502e commit 66db4c3
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 73 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion integration/tests/streaming/get_by_offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn index_cache_disabled() -> bool {

#[test_matrix(
[msg_size(50), msg_size(1000), msg_size(20000)],
[msgs_req_to_save(10), msgs_req_to_save(24)],
[msgs_req_to_save(10), msgs_req_to_save(24), msgs_req_to_save(1000)],
[segment_size(500), segment_size(2000), segment_size(100000)],
[msg_cache_size(0), msg_cache_size(5000), msg_cache_size(50000), msg_cache_size(2000000)],
[index_cache_disabled(), index_cache_enabled()])]
Expand Down
6 changes: 3 additions & 3 deletions integration/tests/streaming/get_by_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn index_cache_disabled() -> bool {

#[test_matrix(
[msg_size(50), msg_size(1000), msg_size(20000)],
[msgs_req_to_save(3), msgs_req_to_save(10)],
[msgs_req_to_save(3), msgs_req_to_save(10), msgs_req_to_save(1000)],
[segment_size(500), segment_size(2000), segment_size(100000)],
[msg_cache_size(0), msg_cache_size(5000), msg_cache_size(50000), msg_cache_size(2000000)],
[index_cache_disabled(), index_cache_enabled()])]
Expand Down Expand Up @@ -262,7 +262,7 @@ async fn test_get_messages_by_timestamp(
"Message timestamp {} at position {} is less than initial timestamp {}",
loaded_message.timestamp,
i,
initial_timestamp.as_micros()
initial_timestamp
);
}

Expand All @@ -285,7 +285,7 @@ async fn test_get_messages_by_timestamp(
msg.timestamp >= span_timestamp.as_micros(),
"Message timestamp {} should be >= span timestamp {}",
msg.timestamp,
span_timestamp.as_micros()
span_timestamp
);
}
}
2 changes: 1 addition & 1 deletion integration/tests/streaming/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async fn should_persist_messages_and_then_load_them_by_timestamp() {
"Message timestamp {} at position {} is less than test timestamp {}",
loaded_message.timestamp,
i,
test_timestamp.as_micros()
test_timestamp
);
assert_eq!(
loaded_message
Expand Down
6 changes: 3 additions & 3 deletions integration/tests/streaming/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async fn should_load_existing_segment_from_disk() {
Arc::new(AtomicU64::new(0)),
);
loaded_segment.load_from_disk().await.unwrap();
let loaded_messages = loaded_segment.get_messages(0, 10).await.unwrap();
let loaded_messages = loaded_segment.get_messages_by_offset(0, 10).await.unwrap();

assert_eq!(loaded_segment.partition_id, segment.partition_id);
assert_eq!(loaded_segment.start_offset, segment.start_offset);
Expand Down Expand Up @@ -189,7 +189,7 @@ async fn should_persist_and_load_segment_with_messages() {
);
loaded_segment.load_from_disk().await.unwrap();
let messages = loaded_segment
.get_messages(0, messages_count as u32)
.get_messages_by_offset(0, messages_count as u32)
.await
.unwrap();
assert_eq!(messages.len(), messages_count as usize);
Expand Down Expand Up @@ -279,7 +279,7 @@ async fn should_persist_and_load_segment_with_messages_with_nowait_confirmation(
);
loaded_segment.load_from_disk().await.unwrap();
let messages = loaded_segment
.get_messages(0, messages_count as u32)
.get_messages_by_offset(0, messages_count as u32)
.await
.unwrap();
assert_eq!(messages.len(), messages_count as usize);
Expand Down
2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.202"
version = "0.6.203"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "Apache-2.0"
Expand Down
6 changes: 6 additions & 0 deletions sdk/src/utils/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ impl From<IggyTimestamp> for u64 {
}
}

impl From<SystemTime> for IggyTimestamp {
fn from(timestamp: SystemTime) -> Self {
IggyTimestamp(timestamp)
}
}

impl Add<SystemTime> for IggyTimestamp {
type Output = IggyTimestamp;

Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.212"
version = "0.4.213"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down
12 changes: 12 additions & 0 deletions server/src/streaming/batching/batch_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ impl BatchAccumulator {
self.messages[start_idx..end_idx].to_vec()
}

pub fn get_messages_by_timestamp(
&self,
start_timestamp: u64,
count: usize,
) -> Vec<Arc<RetainedMessage>> {
let start_idx = self
.messages
.partition_point(|msg| msg.timestamp < start_timestamp);
let end_idx = std::cmp::min(start_idx + count, self.messages.len());
self.messages[start_idx..end_idx].to_vec()
}

pub fn is_empty(&self) -> bool {
self.messages.is_empty()
}
Expand Down
89 changes: 31 additions & 58 deletions server/src/streaming/partitions/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tracing::{trace, warn};
const EMPTY_MESSAGES: Vec<RetainedMessage> = vec![];

impl Partition {
/// Retrieves messages by timestamp (up to a specified count).
pub async fn get_messages_by_timestamp(
&self,
timestamp: IggyTimestamp,
Expand All @@ -28,71 +29,40 @@ impl Partition {
self.partition_id
);

if self.segments.is_empty() {
if self.segments.is_empty() || count == 0 {
return Ok(Vec::new());
}

let query_ts = timestamp.as_micros();
let mut all_messages = Vec::new();
let mut remaining = count;
let mut start_segment_index = None;

for (i, segment) in self.segments.iter().enumerate() {
let indexes = if self.config.segment.cache_indexes {
segment
.indexes
.clone()
.filter(|index| !index.is_empty())
.map(|index| index[0])
} else {
segment
.load_index_for_timestamp(query_ts)
.await
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to load index for timestamp, partition: {}, \
segment start offset: {}, timestamp: {}",
self, segment.start_offset, query_ts
)
})?
};
if indexes.is_some() {
start_segment_index = Some(i);
break;
let mut messages = Vec::new();
let mut remaining = count as usize;

for segment in &self.segments {
if segment.end_timestamp < query_ts {
continue;
}
}

if let Some(segment_index) = start_segment_index {
for segment in &self.segments[segment_index..] {
if remaining == 0 {
break;
}
let messages = self
.get_messages_by_offset(
segment.start_offset,
segment.get_messages_count() as u32,
let segment_messages = segment
.get_messages_by_timestamp(query_ts, remaining)
.await
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to get messages from segment, \
partition: {}, segment start: {}, end: {}",
self, segment.start_offset, segment.end_offset
)
.await
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to get messages by offset, partition: {}, \
timestamp: {}, start offset: {}",
self, query_ts, segment.start_offset
)
})?;

let filtered: Vec<_> = messages
.into_iter()
.filter(|msg| msg.timestamp >= query_ts)
.take(remaining as usize)
.collect();
})?;

let num_messages = segment_messages.len();
messages.extend(segment_messages);
remaining -= num_messages;

remaining = remaining.saturating_sub(filtered.len() as u32);
all_messages.extend(filtered);
if remaining == 0 {
break;
}
}

Ok(all_messages)
Ok(messages)
}

// Retrieves messages by offset (up to a specified count).
Expand All @@ -118,7 +88,11 @@ impl Partition {
let segments = self.filter_segments_by_offsets(start_offset, end_offset);
match segments.len() {
0 => Ok(Vec::new()),
1 => segments[0].get_messages(start_offset, count).await,
1 => {
segments[0]
.get_messages_by_offset(start_offset, count)
.await
}
_ => Self::get_messages_from_segments(segments, start_offset, count).await,
}
}
Expand Down Expand Up @@ -187,6 +161,7 @@ impl Partition {

self.get_messages_by_offset(offset, count).await
}

fn get_end_offset(&self, offset: u64, count: u32) -> u64 {
let mut end_offset = offset + (count - 1) as u64;
let segment = self.segments.last().unwrap();
Expand Down Expand Up @@ -225,7 +200,7 @@ impl Partition {
break;
}
let segment_messages = segment
.get_messages(offset, remaining_count)
.get_messages_by_offset(offset, remaining_count)
.await
.with_error_context(|error| {
format!(
Expand Down Expand Up @@ -288,7 +263,6 @@ impl Partition {
break;
}
if segment_size_bytes > remaining_size {
// Last segment is bigger than the remaining size, so we need to get the newest messages from it.
let partial_batches = segment
.get_newest_batches_by_size(remaining_size)
.await
Expand All @@ -302,7 +276,6 @@ impl Partition {
break;
}

// Current segment is smaller than the remaining size, so we need to get all messages from it.
let segment_batches = segment
.get_all_batches()
.await
Expand Down
74 changes: 72 additions & 2 deletions server/src/streaming/segments/reading_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,39 @@ impl Segment {
self.current_offset - self.start_offset + 1
}

pub async fn get_messages(
pub async fn get_messages_by_timestamp(
&self,
start_timestamp: u64,
count: usize,
) -> Result<Vec<Arc<RetainedMessage>>, IggyError> {
if count == 0 {
return Ok(Vec::new());
}

let mut messages = Vec::with_capacity(count);
let mut remaining = count;

let disk_messages = self
.load_messages_from_disk_by_timestamp(start_timestamp, remaining)
.await?;
let disk_count = disk_messages.len();
messages.extend(disk_messages);
remaining -= disk_count;

if remaining > 0 {
if let Some(batch_accumulator) = &self.unsaved_messages {
let buffer_messages =
batch_accumulator.get_messages_by_timestamp(start_timestamp, remaining);
messages.extend(buffer_messages);
}
}

// Ensure we return exactly requested count (truncate if buffer had more)
messages.truncate(count);
Ok(messages)
}

pub async fn get_messages_by_offset(
&self,
mut offset: u64,
count: u32,
Expand Down Expand Up @@ -84,7 +116,7 @@ impl Segment {
}

pub async fn get_all_messages(&self) -> Result<Vec<Arc<RetainedMessage>>, IggyError> {
self.get_messages(self.start_offset, self.get_messages_count() as u32)
self.get_messages_by_offset(self.start_offset, self.get_messages_count() as u32)
.await
}

Expand Down Expand Up @@ -177,6 +209,44 @@ impl Segment {
Ok(index)
}

async fn load_messages_from_disk_by_timestamp(
&self,
start_timestamp: u64,
count: usize,
) -> Result<Vec<Arc<RetainedMessage>>, IggyError> {
let index = self.load_index_for_timestamp(start_timestamp).await?;
let Some(index) = index else {
return Ok(Vec::new());
};

let index_range = IndexRange {
start: index,
end: Index {
offset: u32::MAX,
position: u32::MAX,
timestamp: u64::MAX,
},
};
let batches = self.load_batches_by_range(&index_range).await?;

let mut messages = Vec::with_capacity(count);
for batch in batches {
for msg in batch.into_messages_iter() {
if msg.timestamp >= start_timestamp {
messages.push(Arc::new(msg));
if messages.len() >= count {
break;
}
}
}
if messages.len() >= count {
break;
}
}

Ok(messages)
}

/// Loads and verifies message checksums from the log file.
pub async fn load_message_checksums(&self) -> Result<(), IggyError> {
self.log_reader
Expand Down
Loading

0 comments on commit 66db4c3

Please sign in to comment.