Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Malt.InProcessWorker #39

Merged
merged 2 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 67 additions & 25 deletions src/Malt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,24 @@ end

unwrap_worker_result(result::WorkerResult) = result.should_throw ? throw(result.value) : result.value

abstract type AbstractWorker end

"""
Malt.InProcessWorker(mod::Module=Main)

This implements the same functions as `Malt.Worker` but runs in the same
process as the caller.
"""
mutable struct InProcessWorker <: AbstractWorker
host_module::Module
latest_request_task::Task
running::Bool

function InProcessWorker(mod=Main)
task = schedule(Task(() -> nothing))
new(mod, task, true)
end
end

"""
Malt.Worker()
Expand All @@ -45,7 +63,7 @@ julia> w = Malt.worker()
Malt.Worker(0x0000, Process(`…`, ProcessRunning))
```
"""
mutable struct Worker
mutable struct Worker <: AbstractWorker
port::UInt16
proc::Base.Process

Expand All @@ -67,7 +85,7 @@ mutable struct Worker
# Connect
socket = Sockets.connect(port)
_buffer_writes(socket)


# There's no reason to keep the worker process alive after the manager loses its handle.
w = finalizer(w -> @async(stop(w)),
Expand Down Expand Up @@ -96,7 +114,7 @@ function _receive_loop(worker::Worker)
@debug("HOST: io closed.")
break
end

@debug "HOST: Waiting for message"
msg_type = try
if eof(io)
Expand All @@ -116,7 +134,7 @@ function _receive_loop(worker::Worker)
end
# this next line can't fail
msg_id = read(io, MsgID)

msg_data, success = try
deserialize(io), true
catch err
Expand Down Expand Up @@ -153,7 +171,6 @@ function _receive_loop(worker::Worker)
sleep(3)
if isrunning(worker)
@error "HOST: Connection lost with worker, but the process is still running. Killing proces..." exception = (e, catch_backtrace())

kill(worker)
else
# This is a clean exit
Expand Down Expand Up @@ -296,12 +313,22 @@ function remotecall(f, w::Worker, args...; kwargs...)
_new_call_msg(true, f, args, kwargs),
)
end
function remotecall(f, w::InProcessWorker, args...; kwargs...)
w.latest_request_task = @async try
f(args...; kwargs...)
catch ex
ex
end
end

"""
Malt.remotecall_fetch(f, w::Worker, args...; kwargs...)

Shorthand for `fetch(Malt.remotecall(…))`. Blocks and then returns the result of the remote call.
"""
function remotecall_fetch(f, w::AbstractWorker, args...; kwargs...)
fetch(remotecall(f, w, args...; kwargs...))
end
function remotecall_fetch(f, w::Worker, args...; kwargs...)
_send_receive(
w,
Expand All @@ -310,12 +337,14 @@ function remotecall_fetch(f, w::Worker, args...; kwargs...)
)
end


"""
Malt.remotecall_wait(f, w::Worker, args...; kwargs...)

Shorthand for `wait(Malt.remotecall(…))`. Blocks and discards the resulting value.
"""
function remotecall_wait(f, w::AbstractWorker, args...; kwargs...)
wait(remotecall(f, w, args...; kwargs...))
end
function remotecall_wait(f, w::Worker, args...; kwargs...)
_send_receive(
w,
Expand All @@ -325,7 +354,6 @@ function remotecall_wait(f, w::Worker, args...; kwargs...)
end



"""
Malt.remote_do(f, w::Worker, args...; kwargs...)

Expand All @@ -342,7 +370,10 @@ function remote_do(f, w::Worker, args...; kwargs...)
)
nothing
end

function remote_do(f, ::InProcessWorker, args...; kwargs...)
@async f(args...; kwargs...)
nothing
end


## Eval variants
Expand All @@ -368,23 +399,23 @@ julia> Malt.remote_eval_fetch(w, :x)
```

"""
remote_eval(m::Module, w::Worker, expr) = remotecall(Core.eval, w, m, expr)
remote_eval(m::Module, w::AbstractWorker, expr) = remotecall(Core.eval, w, m, expr)


"""
Shorthand for `fetch(Malt.remote_eval(…))`. Blocks and returns the resulting value.
"""
remote_eval_fetch(m::Module, w::Worker, expr) = remotecall_fetch(Core.eval, w, m, expr)
remote_eval_fetch(m::Module, w::AbstractWorker, expr) = remotecall_fetch(Core.eval, w, m, expr)


"""
Shorthand for `wait(Malt.remote_eval(…))`. Blocks and discards the resulting value.
"""
remote_eval_wait(m::Module, w::Worker, expr) = remotecall_wait(Core.eval, w, m, expr)
remote_eval_wait(m::Module, w::AbstractWorker, expr) = remotecall_wait(Core.eval, w, m, expr)


"""
Malt.worker_channel(w::Worker, expr)
Malt.worker_channel(w::AbstractWorker, expr)

Create a channel to communicate with worker `w`. `expr` must be an expression
that evaluates to a Channel. `expr` should assign the Channel to a (global) variable
Expand All @@ -393,21 +424,24 @@ so the worker has a handle that can be used to send messages back to the manager
function worker_channel(w::Worker, expr)
RemoteChannel(w, expr)
end
function worker_channel(w::InProcessWorker, expr)
Core.eval(w.host_module, expr)
end


struct RemoteChannel{T} <: AbstractChannel{T}
worker::Worker
id::UInt64
function RemoteChannel{T}(worker::Worker, expr) where T

function RemoteChannel{T}(worker::Worker, expr) where {T}

id = (worker.current_message_id += MsgID(1))::MsgID
remote_eval_wait(Main, worker, quote
Main._channel_cache[$id] = $expr
end)
new{T}(worker, id)
end

RemoteChannel(w::Worker, expr) = RemoteChannel{Any}(w, expr)
end

Expand All @@ -428,6 +462,7 @@ Base.wait(rc::RemoteChannel) = remote_eval_wait(Main, rc.worker, :(wait(Main._ch
Check whether the worker process `w` is running.
"""
isrunning(w::Worker)::Bool = Base.process_running(w.proc)
isrunning(w::InProcessWorker) = w.running

_assert_is_running(w::Worker) = isrunning(w) || throw(TerminatedWorkerException())

Expand All @@ -448,7 +483,10 @@ function stop(w::Worker)
false
end
end

function stop(w::InProcessWorker)
w.running = false
true
end

"""
Malt.kill(w::Worker)
Expand All @@ -458,8 +496,10 @@ Terminate the worker process `w` forcefully by sending a `SIGTERM` signal.
This is not the recommended way to terminate the process. See `Malt.stop`.
""" # https://youtu.be/dyIilW_eBjc
kill(w::Worker) = Base.kill(w.proc)
kill(::InProcessWorker) = nothing


_wait_for_exit(::AbstractWorker; timeout_s::Real=20) = nothing
function _wait_for_exit(w::Worker; timeout_s::Real=20)
t0 = time()
while isrunning(w)
Expand Down Expand Up @@ -488,24 +528,26 @@ function interrupt(w::Worker)
Base.kill(w.proc, Base.SIGINT)
end
end

function interrupt(w::InProcessWorker)
schedule(w.latest_request_task, InterruptException(); error=true)
end




# Based on `Base.task_done_hook`
function _rethrow_to_repl(e::InterruptException; rethrow_regular::Bool=false)
if isdefined(Base, :active_repl_backend) &&
isdefined(Base.active_repl_backend, :backend_task) &&
isdefined(Base.active_repl_backend, :in_eval) &&
Base.active_repl_backend.backend_task.state === :runnable &&
(isdefined(Base, :Workqueue) || isempty(Base.Workqueue)) &&
Base.active_repl_backend.in_eval
isdefined(Base.active_repl_backend, :backend_task) &&
isdefined(Base.active_repl_backend, :in_eval) &&
Base.active_repl_backend.backend_task.state === :runnable &&
(isdefined(Base, :Workqueue) || isempty(Base.Workqueue)) &&
Base.active_repl_backend.in_eval

@debug "HOST: Rethrowing interrupt to REPL"
@async Base.schedule(Base.active_repl_backend.backend_task, e; error=true)
elseif rethrow_regular
@debug "HOST: Don't know what to do with this interrupt, rethrowing" exception=(e, catch_backtrace())
@debug "HOST: Don't know what to do with this interrupt, rethrowing" exception = (e, catch_backtrace())
rethrow(e)
end
end
Expand Down
6 changes: 3 additions & 3 deletions test/benchmark.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import Distributed
const TEST_BENCHMARK = true


@testset "Benchmark" begin
@testset "Benchmark: $W" for W in (m.InProcessWorker, m.Worker)


w = m.Worker()
w = W()
@test m.isrunning(w) === true


Expand Down Expand Up @@ -96,4 +96,4 @@ end
if TEST_BENCHMARK
@test ratio < 1.1
end
end
end
Loading