Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(server): release lock before async save to prevent deadlock #1567

Merged
merged 1 commit into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions integration/tests/streaming/consumer_offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ async fn assert_persisted_offset(
consumer_offset: &ConsumerOffset,
expected_offsets_count: u32,
) {
storage.save_consumer_offset(consumer_offset).await.unwrap();
storage
.save_consumer_offset(consumer_offset.offset, &consumer_offset.path)
.await
.unwrap();
let consumer_offsets = storage
.load_consumer_offsets(consumer_offset.kind, path)
.await
Expand All @@ -51,11 +54,7 @@ async fn assert_persisted_offset(
assert_eq!(consumer_offsets.len(), expected_offsets_count);
let loaded_consumer_offset = consumer_offsets.get(expected_offsets_count - 1).unwrap();

// TODO(hubcio): This is a workaround: sometimes offset is 4, sometimes 5
let offset_ok = loaded_consumer_offset.offset == consumer_offset.offset
|| loaded_consumer_offset.offset == consumer_offset.offset + 1
|| loaded_consumer_offset.offset == consumer_offset.offset - 1;
assert!(offset_ok);
assert!(loaded_consumer_offset.offset == consumer_offset.offset);

assert_eq!(loaded_consumer_offset.kind, consumer_offset.kind);
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.210"
version = "0.4.211"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down
9 changes: 5 additions & 4 deletions server/src/streaming/partitions/consumer_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,15 @@ impl Partition {
let consumer_offsets = self.get_consumer_offsets(kind);
if let Some(mut consumer_offset) = consumer_offsets.get_mut(&consumer_id) {
consumer_offset.offset = offset;
let path = consumer_offset.path.clone();
drop(consumer_offset);
self.storage
.partition
.save_consumer_offset(&consumer_offset)
.save_consumer_offset(offset, &path)
.await
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to save consumer offset, consumer ID: {}, offset: {}",
consumer_id, offset
"{COMPONENT} (error: {error}) - failed to save consumer offset, consumer ID: {consumer_id}, offset: {offset}, path: {path}",
)
})?;
return Ok(());
Expand All @@ -98,7 +99,7 @@ impl Partition {
let consumer_offset = ConsumerOffset::new(kind, consumer_id, offset, path);
self.storage
.partition
.save_consumer_offset(&consumer_offset)
.save_consumer_offset(offset, &consumer_offset.path)
.await
.with_error_context(|error| {
format!(
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/partitions/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct ConsumerOffset {
pub kind: ConsumerKind,
pub consumer_id: u32,
pub offset: u64,
pub path: String,
pub path: Arc<String>,
}

impl ConsumerOffset {
Expand All @@ -63,7 +63,7 @@ impl ConsumerOffset {
kind,
consumer_id,
offset,
path: format!("{path}/{consumer_id}"),
path: Arc::new(format!("{path}/{consumer_id}")),
}
}
}
Expand Down
18 changes: 6 additions & 12 deletions server/src/streaming/partitions/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,21 +335,15 @@ impl PartitionStorage for FilePartitionStorage {
Ok(())
}

async fn save_consumer_offset(&self, offset: &ConsumerOffset) -> Result<(), IggyError> {
async fn save_consumer_offset(&self, offset: u64, path: &str) -> Result<(), IggyError> {
self.persister
.overwrite(&offset.path, &offset.offset.to_le_bytes())
.overwrite(path, &offset.to_le_bytes())
.await
.with_error_context(|error| format!(
"{COMPONENT} (error: {error}) - failed to overwrite consumer offset with value: {}, kind: {}, consumer ID: {}, path: {}",
offset.offset, offset.kind, offset.consumer_id, offset.path,
"{COMPONENT} (error: {error}) - failed to overwrite consumer offset with value: {}, path: {}",
offset, path,
))?;
trace!(
"Stored consumer offset value: {} for {} with ID: {}, path: {}",
offset.offset,
offset.kind,
offset.consumer_id,
offset.path
);
trace!("Stored consumer offset value: {}, path: {}", offset, path);
Ok(())
}

Expand Down Expand Up @@ -390,7 +384,7 @@ impl PartitionStorage for FilePartitionStorage {
continue;
}

let path = path.unwrap().to_string();
let path = Arc::new(path.unwrap().to_string());
let consumer_id = consumer_id.unwrap();
let mut file = file::open(&path)
.await
Expand Down
5 changes: 3 additions & 2 deletions server/src/streaming/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ pub trait PartitionStorage: Send {
fn delete(&self, partition: &Partition) -> impl Future<Output = Result<(), IggyError>> + Send;
fn save_consumer_offset(
&self,
offset: &ConsumerOffset,
offset: u64,
path: &str,
) -> impl Future<Output = Result<(), IggyError>> + Send;
fn load_consumer_offsets(
&self,
Expand Down Expand Up @@ -177,7 +178,7 @@ impl PartitionStorageKind {
-> Result<(), IggyError>;
async fn save(&self, partition: &mut Partition) -> Result<(), IggyError>;
async fn delete(&self, partition: &Partition) -> Result<(), IggyError>;
async fn save_consumer_offset(&self, offset: &ConsumerOffset) -> Result<(), IggyError>;
async fn save_consumer_offset(&self, offset: u64, path: &str) -> Result<(), IggyError>;
async fn load_consumer_offsets(
&self,
kind: ConsumerKind,
Expand Down