diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ea20f6d..bde16d7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,7 +13,6 @@ jobs: strategy: matrix: julia-version: - - '1.8' - '1.9' - '1.10' - '1.11' diff --git a/Project.toml b/Project.toml index 10a3127..9430adf 100644 --- a/Project.toml +++ b/Project.toml @@ -8,7 +8,9 @@ version = "0.4.10" ConcurrentCollections = "5060bff5-0b44-40c5-b522-fcd3ca5cecdd" DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" +DistributedNext = "fab6aee4-877b-4bac-a744-3eca44acbb6f" Mmap = "a63ad114-7e13-5084-954f-fe012c677804" +Preferences = "21216c6a-2e73-6563-6e65-726566657250" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63" Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" @@ -18,12 +20,15 @@ UnsafeAtomics = "013be700-e6cd-48c3-b4a1-df204f14c38f" [compat] ConcurrentCollections = "0.1" DataStructures = "0.18" +DistributedNext = "1" +Preferences = "1" ScopedValues = "1" UnsafeAtomics = "0.2" -julia = "1.8" +julia = "1.9" [extras] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" +Preferences = "21216c6a-2e73-6563-6e65-726566657250" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" diff --git a/src/MemPool.jl b/src/MemPool.jl index 6f800ed..7edd3c8 100644 --- a/src/MemPool.jl +++ b/src/MemPool.jl @@ -1,5 +1,6 @@ module MemPool +import Preferences: @load_preference, @set_preferences! using Serialization, Sockets, Random import Serialization: serialize, deserialize export DRef, FileRef, poolset, poolget, mmwrite, mmread, cleanup @@ -52,6 +53,20 @@ unwrap_payload(f::FileRef) = unwrap_payload(open(deserialize, f.file, "r+")) approx_size(f::FileRef) = f.size +# Preferences settings + +""" + set_distributed_package!(value[="Distributed|DistributedNext"]) + +Set a [preference](https://github.com/JuliaPackaging/Preferences.jl) for using +either the Distributed.jl stdlib or DistributedNext.jl. You will need to restart +Julia after setting a new preference. +""" +function set_distributed_package!(value) + @set_preferences!("distributed-package" => value) + @info "MemPool.jl preference has been set, restart your Julia session for this change to take effect!" +end + include("io.jl") include("lock.jl") include("read_write_lock.jl") diff --git a/src/datastore.jl b/src/datastore.jl index 3e3881b..c02903e 100644 --- a/src/datastore.jl +++ b/src/datastore.jl @@ -1,4 +1,10 @@ -using Distributed +if @load_preference("distributed-package") == "DistributedNext" + using DistributedNext + import DistributedNext: ClusterSerializer, worker_id_from_socket +else + using Distributed + import Distributed: ClusterSerializer, worker_id_from_socket +end mutable struct DRef owner::Int @@ -26,13 +32,13 @@ function Serialization.serialize(io::AbstractSerializer, d::DRef) _pooltransfer_send(io, d) end -function _pooltransfer_send(io::Distributed.ClusterSerializer, d::DRef) - pid = Distributed.worker_id_from_socket(io.io) +function _pooltransfer_send(io::ClusterSerializer, d::DRef) + pid = worker_id_from_socket(io.io) if pid != -1 pooltransfer_send_local(d, pid) return end - pid = Distributed.worker_id_from_socket(io) + pid = worker_id_from_socket(io) if pid != -1 pooltransfer_send_local(d, pid) return @@ -60,7 +66,7 @@ function Serialization.deserialize(io::AbstractSerializer, dt::Type{DRef}) _pooltransfer_recv(io, d) return d end -function _pooltransfer_recv(io::Distributed.ClusterSerializer, d) +function _pooltransfer_recv(io::ClusterSerializer, d) # Add a new reference manually, and unref on finalization DEBUG_REFCOUNTING[] && _enqueue_work(Core.print, "<- (", d.owner, ", ", d.id, ") at ", myid(), "\n") poolref(d, true) diff --git a/test/runtests.jl b/test/runtests.jl index 6c66149..52d30d1 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -855,6 +855,17 @@ end MemPool.retain_on_device!(sdevice, x1, false) end +@testset "Preferences" begin + cmd = `$(Base.julia_cmd()) --startup-file=no --project -E 'using MemPool; parentmodule(MemPool.addprocs)'` + + cd(dirname(Base.active_project())) do + @test readchomp(cmd) == "Distributed" + + MemPool.set_distributed_package!("DistributedNext") + @test readchomp(cmd) == "DistributedNext" + end +end + #= TODO Allocate, write non-CPU A, write non-CPU B, handle for A was explicitly deleted Allocate, chain write and reads such that write starts before, and finishes after, read, ensure ordering is correct