Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polled SSR Stream #2824

Merged
merged 32 commits into from
Sep 10, 2022
Merged
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5bf1ae8
Switch to pinned channels.
futursolo Aug 4, 2022
fb2603e
Fix ServerRenderer so it's not blocked until the result is resolved.
futursolo Aug 5, 2022
892b759
Fix tests.
futursolo Aug 5, 2022
29a46bd
Remove unused SendError.
futursolo Aug 5, 2022
20fc4a3
Merge branch 'master' into pinned-channels
futursolo Aug 7, 2022
8a41c3e
Implement a stream to be polled alongside rendering.
futursolo Aug 11, 2022
8688045
Update Buffer Size.
futursolo Aug 11, 2022
3bd9e0a
Make Send renderer work.
futursolo Aug 11, 2022
ed36760
Remove pinned channels.
futursolo Aug 11, 2022
2799a22
Unified Naming.
futursolo Aug 11, 2022
02a838d
Optimise code.
futursolo Aug 12, 2022
3de0770
Restore capacity.
futursolo Aug 12, 2022
689980e
merge 'master' into polled-stream
futursolo Aug 12, 2022
cbbd765
Remove unused profile.
futursolo Aug 12, 2022
2045c49
Merge branch 'master' into polled-stream
futursolo Aug 12, 2022
a470862
Default to separate resolver.
futursolo Aug 12, 2022
3ac02fd
Reduce allocations on string.
futursolo Aug 12, 2022
345b745
Adjust API.
futursolo Aug 13, 2022
d1a6ab3
Remove duplicate trait bound.
futursolo Aug 13, 2022
f0863a4
Update docs.
futursolo Aug 13, 2022
7dcaf6e
Remove capacity setting.
futursolo Aug 13, 2022
ced12ac
Merge branch 'master' into polled-stream
futursolo Aug 14, 2022
475f52e
Merge branch 'master' into polled-stream
futursolo Aug 16, 2022
e757d35
Unsafe?
futursolo Aug 16, 2022
0a399ba
Separate files.
futursolo Aug 16, 2022
6ae5a65
Adjust inlining.
futursolo Aug 16, 2022
f310575
Merge branch 'master' into polled-stream
futursolo Aug 29, 2022
e18e0fd
Fix test.
futursolo Aug 29, 2022
67348d3
Update notice.
futursolo Aug 29, 2022
f491d6c
Update documentation.
futursolo Sep 4, 2022
2a300fe
Merge branch 'master' into polled-stream
futursolo Sep 4, 2022
a78f0b3
Fix tests.
futursolo Sep 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Switch to pinned channels.
futursolo committed Aug 4, 2022
commit 5bf1ae83362335a0f25bd449f3c1bf63d412a371
8 changes: 4 additions & 4 deletions packages/yew/Cargo.toml
Original file line number Diff line number Diff line change
@@ -31,8 +31,6 @@ implicit-clone = { version = "0.3", features = ["map"] }
base64ct = { version = "1.5.0", features = ["std"], optional = true }
bincode = { version = "1.3.3", optional = true }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1.19", features = ["sync"] }
tokio-stream = { version = "0.1.9", features = ["sync"] }

[dependencies.web-sys]
version = "0.3"
@@ -75,8 +73,10 @@ wasm-bindgen-futures = "0.4"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
num_cpus = { version = "1.13", optional = true }
tokio-util = { version = "0.7", features = ["rt"], optional = true }
once_cell = "1"
tokio = { version = "1.19", features = ["rt", "time"], optional = true }
tokio-stream = { version = "0.1", features = ["time"], optional = true }
tokio-util = { version = "0.7", features = ["rt"], optional = true }

[dev-dependencies]
wasm-bindgen-test = "0.3"
@@ -93,7 +93,7 @@ features = [
]

[features]
tokio = ["tokio/rt", "tokio/time", "dep:num_cpus", "dep:tokio-util"]
tokio = ["dep:tokio", "dep:num_cpus", "dep:tokio-util", "dep:tokio-stream"]
ssr = ["dep:html-escape", "dep:base64ct", "dep:bincode"]
csr = []
hydration = ["csr", "dep:bincode"]
2 changes: 1 addition & 1 deletion packages/yew/src/html/component/lifecycle.rs
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ pub(crate) enum ComponentRenderState {

#[cfg(feature = "ssr")]
Ssr {
sender: Option<crate::platform::sync::oneshot::Sender<Html>>,
sender: Option<crate::platform::pinned::oneshot::Sender<Html>>,
},
}

6 changes: 3 additions & 3 deletions packages/yew/src/html/component/scope.rs
Original file line number Diff line number Diff line change
@@ -264,15 +264,15 @@ mod feat_ssr {
use crate::html::component::lifecycle::{
ComponentRenderState, CreateRunner, DestroyRunner, RenderRunner,
};
use crate::platform::io::BufWriter;
use crate::platform::sync::oneshot;
use crate::platform::fmt::BufWrite;
use crate::platform::pinned::oneshot;
use crate::scheduler;
use crate::virtual_dom::Collectable;

impl<COMP: BaseComponent> Scope<COMP> {
pub(crate) async fn render_into_stream(
&self,
w: &mut BufWriter,
w: &mut dyn BufWrite,
props: Rc<COMP::Properties>,
hydratable: bool,
) {
Original file line number Diff line number Diff line change
@@ -5,31 +5,40 @@

use std::borrow::Cow;

use futures::stream::Stream;

use crate::platform::sync::mpsc::{self, UnboundedReceiverStream, UnboundedSender};
use crate::platform::pinned;

// Same as std::io::BufWriter and futures::io::BufWriter.
pub(crate) const DEFAULT_BUF_SIZE: usize = 8 * 1024;

/// A [`futures::io::BufWriter`], but operates over string and yields into a Stream.
pub(crate) struct BufWriter {
buf: String,
tx: UnboundedSender<String>,
capacity: usize,
pub(crate) trait BufSend {
fn buf_send(&self, item: String);
}

/// Creates a Buffer pair.
pub(crate) fn buffer(capacity: usize) -> (BufWriter, impl Stream<Item = String>) {
let (tx, rx) = mpsc::unbounded_channel::<String>();
impl BufSend for pinned::mpsc::UnboundedSender<String> {
fn buf_send(&self, item: String) {
let _ = self.send_now(item);
}
}

let tx = BufWriter {
buf: String::with_capacity(capacity),
tx,
capacity,
};
impl BufSend for futures::channel::mpsc::UnboundedSender<String> {
fn buf_send(&self, item: String) {
let _ = self.unbounded_send(item);
}
}

(tx, UnboundedReceiverStream::new(rx))
pub trait BufWrite {
fn capacity(&self) -> usize;
fn write(&mut self, s: Cow<'_, str>);
}

/// A [`futures::io::BufWriter`], but operates over string and yields into a Stream.
pub(crate) struct BufWriter<S>
where
S: BufSend,
{
buf: String,
tx: S,
capacity: usize,
}

// Implementation Notes:
@@ -53,25 +62,53 @@ pub(crate) fn buffer(capacity: usize) -> (BufWriter, impl Stream<Item = String>)
// 2. If a fixed buffer is used, the rendering process can become blocked if the buffer is filled.
// Using a stream avoids this side effect and allows the renderer to finish rendering
// without being actively polled.
impl BufWriter {
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
impl<S> BufWriter<S>
where
S: BufSend,
{
pub fn new(tx: S, capacity: usize) -> Self {
Self {
buf: String::new(),
tx,
capacity,
}
}

#[inline]
fn drain(&mut self) {
let _ = self.tx.send(self.buf.drain(..).collect());
self.buf.reserve(self.capacity);
if !self.buf.is_empty() {
self.tx.buf_send(self.buf.split_off(0));
}
}

#[inline]
fn reserve(&mut self) {
if self.buf.is_empty() {
self.buf.reserve(self.capacity);
}
}

/// Returns `True` if the internal buffer has capacity to fit a string of certain length.
#[inline]
fn has_capacity_of(&self, next_part_len: usize) -> bool {
self.buf.capacity() >= self.buf.len() + next_part_len
}
}

impl<S> BufWrite for BufWriter<S>
where
S: BufSend,
{
#[inline]
fn capacity(&self) -> usize {
self.capacity
}

/// Writes a string into the buffer, optionally drains the buffer.
pub fn write(&mut self, s: Cow<'_, str>) {
fn write(&mut self, s: Cow<'_, str>) {
// Try to reserve the capacity first.
self.reserve();

if !self.has_capacity_of(s.len()) {
// There isn't enough capacity, we drain the buffer.
self.drain();
@@ -87,17 +124,20 @@ impl BufWriter {
// changes if the buffer was drained. If the buffer capacity didn't change,
// then it means self.has_capacity_of() has returned true the first time which will be
// guaranteed to be matched by the left hand side of this implementation.
let _ = self.tx.send(s.into_owned());
self.tx.buf_send(s.into_owned());
}
}
}

impl Drop for BufWriter {
impl<S> Drop for BufWriter<S>
where
S: BufSend,
{
fn drop(&mut self) {
if !self.buf.is_empty() {
let mut buf = String::new();
std::mem::swap(&mut buf, &mut self.buf);
let _ = self.tx.send(buf);
self.tx.buf_send(buf);
}
}
}
4 changes: 2 additions & 2 deletions packages/yew/src/platform/mod.rs
Original file line number Diff line number Diff line change
@@ -43,9 +43,9 @@
use std::future::Future;

#[cfg(feature = "ssr")]
pub(crate) mod io;
pub(crate) mod fmt;

pub mod sync;
pub mod pinned;
pub mod time;

#[cfg(target_arch = "wasm32")]
6 changes: 6 additions & 0 deletions packages/yew/src/platform/pinned/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//! Task Synchronisation Primitives for pinned tasks.
//!
//! This module provides task synchronisation for `!Send` futures.

pub mod mpsc;
pub mod oneshot;
Loading