Skip to content

Commit

Permalink
Avoids converting from Chunk to ByteBuffer within UDP ASG singleton s…
Browse files Browse the repository at this point in the history
…ocket selector thread when doing writes by introducing an ASG private analog of the public Packet data type for the ASG writer which holds a ByteBuffer. The conversion from Chunk is done in the ASG write implementation prior to context switching to selector thread.
  • Loading branch information
sbuzzard committed Aug 28, 2017
1 parent 731acf6 commit ca6e66c
Showing 1 changed file with 28 additions and 12 deletions.
40 changes: 28 additions & 12 deletions io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue,CountDownLatch}
*
* Each `AsynchronousSocketGroup` is assigned a single daemon thread that performs all read/write operations.
*/

sealed trait AsynchronousSocketGroup {
private[udp] type Context
private[udp] def register(channel: DatagramChannel): Context
Expand All @@ -32,6 +33,11 @@ sealed trait AsynchronousSocketGroup {
}

object AsynchronousSocketGroup {
/*
* Used to avoid copying between Chunk[Byte] and ByteBuffer during writes within the selector thread,
* as it can be expensive depending on particular implementation of Chunk.
*/
private class WriterPacket(val remote: InetSocketAddress, val bytes: ByteBuffer)

def apply(): AsynchronousSocketGroup = new AsynchronousSocketGroup {

Expand All @@ -51,9 +57,9 @@ object AsynchronousSocketGroup {
implicit val ordTimeout: Ordering[Timeout] = Ordering.by[Timeout, Long](_.expiry)
}

class Attachment(
private class Attachment(
readers: ArrayDeque[(Either[Throwable,Packet] => Unit,Option[Timeout])] = new ArrayDeque(),
writers: ArrayDeque[((Packet,Option[Throwable] => Unit),Option[Timeout])] = new ArrayDeque()
writers: ArrayDeque[((WriterPacket,Option[Throwable] => Unit),Option[Timeout])] = new ArrayDeque()
) {

def hasReaders: Boolean = !readers.isEmpty
Expand Down Expand Up @@ -86,12 +92,12 @@ object AsynchronousSocketGroup {

def hasWriters: Boolean = !writers.isEmpty

def peekWriter: Option[(Packet,Option[Throwable] => Unit)] = {
def peekWriter: Option[(WriterPacket,Option[Throwable] => Unit)] = {
if (writers.isEmpty) None
else Some(writers.peek()._1)
}

def dequeueWriter: Option[(Packet,Option[Throwable] => Unit)] = {
def dequeueWriter: Option[(WriterPacket,Option[Throwable] => Unit)] = {
if (writers.isEmpty) None
else {
val (w, timeout) = writers.pop()
Expand All @@ -100,7 +106,7 @@ object AsynchronousSocketGroup {
}
}

def queueWriter(writer: (Packet,Option[Throwable] => Unit), timeout: Option[Timeout]): () => Unit = {
def queueWriter(writer: (WriterPacket,Option[Throwable] => Unit), timeout: Option[Timeout]): () => Unit = {
if (closed) {
writer._2(Some(new ClosedChannelException))
timeout.foreach(_.cancel)
Expand Down Expand Up @@ -173,14 +179,13 @@ object AsynchronousSocketGroup {
}

private def read1(key: SelectionKey, channel: DatagramChannel, attachment: Attachment, reader: Either[Throwable,Packet] => Unit): Boolean = {
readBuffer.clear
try {
val src = channel.receive(readBuffer).asInstanceOf[InetSocketAddress]
if (src eq null) {
false
} else {
readBuffer.flip
val bytes = Array.ofDim[Byte](readBuffer.remaining)
val bytes = new Array[Byte](readBuffer.remaining)
readBuffer.get(bytes)
readBuffer.clear
reader(Right(new Packet(src, Chunk.bytes(bytes))))
Expand All @@ -193,6 +198,17 @@ object AsynchronousSocketGroup {
}

override def write(key: SelectionKey, packet: Packet, timeout: Option[FiniteDuration], cb: Option[Throwable] => Unit): Unit = {
val writerPacket = {
val bytes = {
val srcBytes = packet.bytes.toBytes
if (srcBytes.size == srcBytes.values.size) srcBytes.values else {
val destBytes = new Array[Byte](srcBytes.size)
Array.copy(srcBytes.values, 0, destBytes, srcBytes.offset, srcBytes.size)
destBytes
}
}
new WriterPacket(packet.remote, ByteBuffer.wrap(bytes))
}
onSelectorThread {
val channel = key.channel.asInstanceOf[DatagramChannel]
val attachment = key.attachment.asInstanceOf[Attachment]
Expand All @@ -202,11 +218,11 @@ object AsynchronousSocketGroup {
if (cancelWriter ne null) cancelWriter()
}}
if (attachment.hasWriters) {
cancelWriter = attachment.queueWriter((packet, cb), t)
cancelWriter = attachment.queueWriter((writerPacket, cb), t)
t.foreach { t => pendingTimeouts += t }
} else {
if (!write1(key, channel, attachment, packet, cb)) {
cancelWriter = attachment.queueWriter((packet, cb), t)
if (!write1(key, channel, attachment, writerPacket, cb)) {
cancelWriter = attachment.queueWriter((writerPacket, cb), t)
t.foreach { t => pendingTimeouts += t }
try { key.interestOps(key.interestOps | SelectionKey.OP_WRITE); () }
catch { case t: CancelledKeyException => /* Ignore; key was closed */ }
Expand All @@ -215,9 +231,9 @@ object AsynchronousSocketGroup {
} { cb(Some(new ClosedChannelException)) }
}

private def write1(key: SelectionKey, channel: DatagramChannel, attachment: Attachment, p: Packet, cb: Option[Throwable] => Unit): Boolean = {
private def write1(key: SelectionKey, channel: DatagramChannel, attachment: Attachment, packet: WriterPacket, cb: Option[Throwable] => Unit): Boolean = {
try {
val sent = channel.send(ByteBuffer.wrap(p.bytes.toArray), p.remote)
val sent = channel.send(packet.bytes, packet.remote)
if (sent > 0) {
cb(None)
true
Expand Down

0 comments on commit ca6e66c

Please sign in to comment.