Skip to content

Commit d76a30a

Browse files
authored
Merge pull request #31036 from tanmaykm/tan/fix31035
start_worker option to close stdin redirect stdout
2 parents ca36e57 + d2b00bd commit d76a30a

File tree

2 files changed

+45
-8
lines changed

2 files changed

+45
-8
lines changed

stdlib/Distributed/src/cluster.jl

+9-8
Original file line numberDiff line numberDiff line change
@@ -207,26 +207,27 @@ worker_timeout() = parse(Float64, get(ENV, "JULIA_WORKER_TIMEOUT", "60.0"))
207207

208208
## worker creation and setup ##
209209
"""
210-
start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin))
210+
start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
211211
212212
`start_worker` is an internal function which is the default entry point for
213213
worker processes connecting via TCP/IP. It sets up the process as a Julia cluster
214214
worker.
215215
216216
host:port information is written to stream `out` (defaults to stdout).
217217
218-
The function closes stdin (after reading the cookie if required), redirects stderr to stdout,
219-
listens on a free port (or if specified, the port in the `--bind-to` command
220-
line option) and schedules tasks to process incoming TCP connections and requests.
218+
The function reads the cookie from stdin if required, and listens on a free port
219+
(or if specified, the port in the `--bind-to` command line option) and schedules
220+
tasks to process incoming TCP connections and requests. It also (optionally)
221+
closes stdin and redirects stderr to stdout.
221222
222223
It does not return.
223224
"""
224-
start_worker(cookie::AbstractString=readline(stdin)) = start_worker(stdout, cookie)
225-
function start_worker(out::IO, cookie::AbstractString=readline(stdin))
225+
start_worker(cookie::AbstractString=readline(stdin); kwargs...) = start_worker(stdout, cookie; kwargs...)
226+
function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
226227
init_multi()
227228

228-
close(stdin) # workers will not use it
229-
redirect_stderr(stdout)
229+
close_stdin && close(stdin) # workers will not use it
230+
stderr_to_stdout && redirect_stderr(stdout)
230231

231232
init_worker(cookie)
232233
interface = IPv4(LPROC.bind_addr)

stdlib/Distributed/test/distributed_exec.jl

+36
Original file line numberDiff line numberDiff line change
@@ -1497,6 +1497,42 @@ cluster_cookie("foobar") # custom cookie
14971497
npids = addprocs_with_testenv(WorkerArgTester(`--worker=foobar`, false))
14981498
@test remotecall_fetch(myid, npids[1]) == npids[1]
14991499

1500+
# tests for start_worker options to retain stdio (issue #31035)
1501+
struct RetainStdioTester <: ClusterManager
1502+
close_stdin::Bool
1503+
stderr_to_stdout::Bool
1504+
end
1505+
1506+
function launch(manager::RetainStdioTester, params::Dict, launched::Array, c::Condition)
1507+
dir = params[:dir]
1508+
exename = params[:exename]
1509+
exeflags = params[:exeflags]
1510+
1511+
jlcmd = "using Distributed; start_worker(\"\"; close_stdin=$(manager.close_stdin), stderr_to_stdout=$(manager.stderr_to_stdout));"
1512+
cmd = detach(setenv(`$exename $exeflags --bind-to $(Distributed.LPROC.bind_addr) -e $jlcmd`, dir=dir))
1513+
proc = open(cmd, "r+")
1514+
1515+
wconfig = WorkerConfig()
1516+
wconfig.process = proc
1517+
wconfig.io = proc.out
1518+
push!(launched, wconfig)
1519+
1520+
notify(c)
1521+
end
1522+
manage(::RetainStdioTester, ::Integer, ::WorkerConfig, ::Symbol) = nothing
1523+
1524+
1525+
nprocs()>1 && rmprocs(workers())
1526+
cluster_cookie("")
1527+
1528+
for close_stdin in (true, false), stderr_to_stdout in (true, false)
1529+
npids = addprocs_with_testenv(RetainStdioTester(close_stdin,stderr_to_stdout))
1530+
@test remotecall_fetch(myid, npids[1]) == npids[1]
1531+
@test close_stdin != remotecall_fetch(()->isopen(stdin), npids[1])
1532+
@test stderr_to_stdout == remotecall_fetch(()->(stderr === stdout), npids[1])
1533+
rmprocs(npids)
1534+
end
1535+
15001536
# Issue # 22865
15011537
# Must be run on a new cluster, i.e., all workers must be in the same state.
15021538
rmprocs(workers())

0 commit comments

Comments
 (0)