Skip to content
This repository was archived by the owner on Jul 1, 2023. It is now read-only.
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: pytorch/tensorpipe
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 42033c5437fc9c181dc9d0a32df600484e2b0685
Choose a base ref
...
head repository: pytorch/tensorpipe
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 9646e1a431997edb1579972cef196d8fb97a77a5
Choose a head ref

Commits on Aug 31, 2020

  1. Fix error: ‘runtime_error’ is not a member of ‘std’ (#204)

    Summary: Pull Request resolved: #204
    
    Reviewed By: beauby
    
    Differential Revision: D23424221
    
    Pulled By: lw
    
    fbshipit-source-id: 7a5913dc2b13c7e7dbff7eed4520536e35efa20a
    cdluminate authored and facebook-github-bot committed Aug 31, 2020
    Copy the full SHA
    19c514d View commit details

Commits on Sep 4, 2020

  1. Fix benchmarks not picking up transports/channels

    Summary: Fixes #163
    
    Reviewed By: beauby
    
    Differential Revision: D23500308
    
    fbshipit-source-id: 64626f9c9eed583f735bdf8df1c5ed64796ec235
    lw authored and facebook-github-bot committed Sep 4, 2020
    Copy the full SHA
    fd3c05d View commit details
  2. Add options for multiple payloads/tensors to benchmarks

    Summary: Needed to be able to get more realistic benchmarks, especially of the protobuf serialization part (more payloads/tensors, bigger protobufs).
    
    Reviewed By: beauby
    
    Differential Revision: D23500307
    
    fbshipit-source-id: eeb965ba96db311130e235a0dfcd0da64498563d
    lw authored and facebook-github-bot committed Sep 4, 2020
    Copy the full SHA
    08c93ae View commit details
  3. Add libnop dependency and use for tensor descriptor

    Summary: Include libnop as a dependency but, for now, keep protobuf too. Provide a set of helpers for libnop, primarily to perform type erasure. In order to build and validate these helpers, use them to (de)serialize the tensor descriptors produced/consumed by channels, as they are a simple and self-contained application. Wider migration from protobuf to libnop will come later.
    
    Reviewed By: heiner
    
    Differential Revision: D22763734
    
    fbshipit-source-id: 75bf8211ba9f89a3ee62e518b9ac44017a4f707e
    lw authored and facebook-github-bot committed Sep 4, 2020
    Copy the full SHA
    ab9fb4b View commit details
  4. Replace protobuf with libnop in connections

    Summary:
    This is the big change, where we swap out protobuf for libnop in the biggest usecases, namely the pipe and the channels. Luckily all these changes are identical so although this consists in many lines the complexity of this change is limited.
    
    What we do is:
    - change the interface of connections to provide a specialization of read/write for NopHolders, rather than for protobuf Messages.
    - convert all the proto definitions to C++ structs, defined in new nop_types.h files.
    - go through all usages of the protobufs and all callsites of read/write and update them.
    
    This change leaves behind some proto defs which are still being used by some unit tests, as these tests will be ported in later commits and those protos will be removed then.
    
    It also doesn't reimplement yet the specialized codepath that the SHM has for protobuf messages: it is left there, albeit "cut off", which means that SHM will fall back to the generic implementation and be slower. This will be temporary as in the next commit we'll port that specialized code and re-attach it.
    
    Reviewed By: heiner
    
    Differential Revision: D22763737
    
    fbshipit-source-id: d44be13e827822662a0b0395aff05ece478dbc51
    lw authored and facebook-github-bot committed Sep 4, 2020
    Copy the full SHA
    4f6e241 View commit details
  5. Use specialized libnop reader/writer for SHM

    Summary: We had specialized "zero-copy streams" for protobuf that allowed us to write/read directly to/from the shared-memory ringbuffer. Now we do the same for libnop with custom reader/writers.
    
    Reviewed By: beauby, heiner
    
    Differential Revision: D22763736
    
    fbshipit-source-id: 734ceeb490fbcb4e3763ab04120393e460b56ea7
    lw authored and facebook-github-bot committed Sep 4, 2020
    Copy the full SHA
    19d82af View commit details
  6. Remove protobuf

    Summary: Now that no one uses protobuf anymore, remove all that is left.
    
    Reviewed By: heiner
    
    Differential Revision: D22763733
    
    fbshipit-source-id: de6fd11af14a9f3a64f695a8c86fe7e9d179b86e
    lw authored and facebook-github-bot committed Sep 4, 2020
    Copy the full SHA
    60cb9e6 View commit details
  7. Fix libnop headers being private in CMake (#206)

    Summary:
    And a missing const_cast
    
    Pull Request resolved: #206
    
    Reviewed By: beauby
    
    Differential Revision: D23540899
    
    Pulled By: lw
    
    fbshipit-source-id: d97bbbfa541f8598aa351c9701b2507e3f7759a0
    lw authored and facebook-github-bot committed Sep 4, 2020
    Copy the full SHA
    49bcc3e View commit details

Commits on Sep 5, 2020

  1. Cuda same-machine channel (#175)

    Summary:
    Introduce the `CudaIpc` channel, which handles exchanging CUDA tensors between processes on the same machine. It leverages NVLink when available.
    
    Pull Request resolved: #175
    
    Reviewed By: lw
    
    Differential Revision: D22355812
    
    Pulled By: beauby
    
    fbshipit-source-id: 0589cb75f58e1c5fcc165e7449074eeb5dc697cf
    beauby authored and facebook-github-bot committed Sep 5, 2020
    Copy the full SHA
    a6c5972 View commit details

Commits on Sep 8, 2020

  1. Fix CMake build of CudaIpc channel. (#208)

    Summary:
    This wasn't caught by CI as we're currently not running GPU tests on CI.
    
    Pull Request resolved: #208
    
    Reviewed By: lw
    
    Differential Revision: D23573767
    
    Pulled By: beauby
    
    fbshipit-source-id: c948c85e79ecb7482ff2446b128845184ba09e21
    beauby authored and facebook-github-bot committed Sep 8, 2020
    Copy the full SHA
    779225d View commit details

Commits on Sep 15, 2020

  1. Clean up ringbuffer consumer

    Summary:
    With the recent migration from protobuf to libnop we've simplified the set of methods we require the ringbuffer consumer/producer to provide. We also came up with a nice low-level general purpose interface to access the ringbuffer: accessing a contiguous region through either one ptr+len or two (if it wraps).
    
    This diff takes the chance we now have to simplify the ringbuffer's methods, starting by the consumer. It makes the "access a contiguous region" method the foundation of everything else: that's now the only place where the ringbuffer logic resides, and everything else builds on top of it like a stack. Note that "everything else" now boils down to two methods: one that, in addition, copies the data out of the ringbuffer into a user-provided buffer, and another one that, on top, deals with transactions for you.
    
    The advantage of this change is that we now have a very consistent experience of the ringbuffer, no matter what one tries to do with it (e.g., we still had some methods that didn't support zero-sized lengths). It also helps robustness: since the logic in the same one method is tested in all accesses, it gets much higher coverage.
    
    Reviewed By: beauby
    
    Differential Revision: D23566823
    
    fbshipit-source-id: bd02e8217682fd5924194714b18de05c6492d9c1
    lw authored and facebook-github-bot committed Sep 15, 2020
    Copy the full SHA
    125e700 View commit details
  2. Clean up ringbuffer producer

    Summary: Same as the diff before, but for the producer.
    
    Reviewed By: beauby
    
    Differential Revision: D23566822
    
    fbshipit-source-id: f0026c12d20c5630f57acd7b58d547328de497ce
    lw authored and facebook-github-bot committed Sep 15, 2020
    Copy the full SHA
    feaf9f3 View commit details
  3. Clear up ownership for shm segments and ringbuffers

    Summary:
    I found it odd that we were using shared_ptrs to store some parts of our ringbuffer/shm-segment code. I can see two reasons:
    - It allows us to have both the consumer and producer hold a shared_ptr to the ringbuffer, so that one could have multiple of them sharing the same ringbuffer, which would be kept alive as long as there were users. In practice I don't think this mattered: each ringbuffer had only one user, both "in the code" (one consumer or one producer, never both) and "logically" (it was owned by one side of the connection, or by reactor, ... and destroyed when they were gone). shared_ptrs weren't buying us anything and in fact were weakening the ownership model.
    - Its aliasing constructor allowed us to bind the lifetime of the shared memory segment to the one of the ringbuffer object that was stored in it, thus allowing us to keep a bunch of things alive by holding just one pointer.
    
    So in the end this resulted in a lot of implicit links, with the end user (the connection, the reactor, ...) holding just a shared_ptr to a consumer/producer and this in turn owning (in a pretend-shared way) a lot of resources, including mmapped memory and file descriptors.
    
    Here I try to make the ownership model more explicit, by separating the resource (the Segment class, which now becomes a movable RAII wrapper which owns the fd and the mmap) from the shallow and stateless helpers used to access it (the ringbuffer, which is a pair of pointers, and the consumer/producer, which are little more than a collection of methods). Consumers/producers are in fact now so simple that they don't have a common base class anymore and are created on-demand when needed.
    
    This removes the usage of shared_ptrs entirely, and I think clarifies the ownership and gives us stronger guarantees that resources will be cleaned up and when that'll occur.
    
    Also, it allows us to store the ringbuffer header on the stack or as a field of another object without having to do some shared_ptr contortions (an aliased shared_ptr with a custom empty destructor). This will come in handy later, for InfiniBand.
    
    Reviewed By: beauby
    
    Differential Revision: D23567125
    
    fbshipit-source-id: c229c7f96655324787eda02c8545ab7bafadb885
    lw authored and facebook-github-bot committed Sep 15, 2020
    Copy the full SHA
    a655116 View commit details
  4. Promote SHM's fd & socket RAII wrappers to project-wide helpers

    Summary:
    We had RAII wrappers for file descriptors and sockets in SHM which were quite helpful. In some places, where we couldn't make use of the Fd wrapper, we were "releasing" file descriptors and then having an other object re-own them, and in between these steps we were dealing with raw integers. Just like when dealing with raw pointers, that makes me queasy. As for the Socket wrapper, it had a very helpful method for sending fds over a unix domain socket, which we wanted to use in the tests, and in order to do so we had some tests depend on the SHM transport, which was weird.
    
    I think it's nicer to make these helpers available to the entire codebase, so that we can use the Fd wrapper throughout (note how we can now delete its `release` method! and note how the Segment now doesn't close its file descriptor on its own but simply holds a Fd which does it on its own) and we can use the Socket methods easily.
    
    Reusability of these components will become more interesting shortly, with the InfiniBand transport.
    
    Reviewed By: beauby
    
    Differential Revision: D23567124
    
    fbshipit-source-id: ca33325a0da4aeb03cde24bb1912049911aa16f2
    lw authored and facebook-github-bot committed Sep 15, 2020
    Copy the full SHA
    0fbfc1a View commit details
  5. Rework SHM's ringbuffer logic

    Summary:
    The recent simplification of the ringbuffer interface allows, in turn, some simplification in its users, i.e., the connection. The read and write operations were in fact following very similar logic, but doing it in very different ways (e.g., the read operation had an `if(error)` after each call to the ringbuffer, and it would clean up and return early in each of them; the write operation instead had an `if(!error)` after each call, and would have a single cleanup at the end).
    
    This diff removes some now obsolete code (handling impossible errors, spinlocking, ...), unifies the logic, making the read and write paths highly symmetric. It also tries to catch errors as early as possible (e.g., if the only "expected" error is ENODATA or ENOSPC, it filters that but throws on anything else), so that the logs we get are more informative.
    
    Reviewed By: beauby
    
    Differential Revision: D23575375
    
    fbshipit-source-id: d8e6ef50b8410e8f812a04ffc4fb74960ac45bf8
    lw authored and facebook-github-bot committed Sep 15, 2020
    Copy the full SHA
    f2d9432 View commit details
  6. Avoid overly-strict atomics in ringbuffer (#217)

    Summary:
    Pull Request resolved: #217
    
    Disclaimer: I thought this would give us some (maybe small) perf gain, but it doesn't seem like that. Probably because x86 has a strong memory consistency model, and thus all atomics are very strict all the time. We might still see perf gains on other platform (think ARM), but we're not targeting them yet, so this is not really relevant. Regardless of the absence of perf gains, I still think this diff makes the code cleaner, as it specifies exactly the requirements we need.
    
    We used atomics for the head/tail of the ringbuffers, which in their default behavior use a sequentially consistent memory model, which is the strongest possible one: all sequentially consistent accesses on any memory in the system are ordered in a single global total order. I expected this would cause some overhead (in the code, or in the processor). We don't need such a strong property though: we only need some ordering guarantees around the order of accesses within the ringbuffer, where we control all users. So we can express all the properties we need using the "weaker" acquire/release memory order.
    
    See here for an explanation of memory orders: https://en.cppreference.com/w/cpp/atomic/memory_order
    
    Reviewed By: beauby
    
    Differential Revision: D23573343
    
    fbshipit-source-id: 808e80d98d34eaa22fab14979bdcdc0a06419f6b
    lw authored and facebook-github-bot committed Sep 15, 2020
    Copy the full SHA
    583f74e View commit details

Commits on Sep 16, 2020

  1. Use RAII wrapper for mmapped memory

    Summary:
    We already had the shm::Segment behave as a RAII wrapper for its mmapped pointer, but it was also doing other things on top. Extracting the mmapped ptr to its own class I think helps readability and reusability. (And we plan to reuse the mmapped pointer in the InfiniBand transport). Note how now the Segment can get rid of its copy&move assignment/constructor and its destructor, because the default ones will do the job.
    
    This diff also introduces a "pattern" for RAII objects, consisting of a class that wraps a unique_ptr with a custom deleter, implementing the acquisition of the resource in the constructor of the class and the release in the deleter. This then gives us a correct RAII class by delegating most of the logic to the unique_ptr, and highlighting the important logic. If we like such a pattern, I plan to use it more later on (e.g., for the ibverbs objects).
    
    Reviewed By: beauby
    
    Differential Revision: D23679003
    
    fbshipit-source-id: 17276ceedb0ee327a0cdbe9d0d6f48ce475d730f
    lw authored and facebook-github-bot committed Sep 16, 2020
    Copy the full SHA
    f5ea0ca View commit details
  2. Clean up shared memory Segment

    Summary:
    This diff caps off the previous stack by doing a few final changes:
    - move some private stuff from the segment's header to its .cc file
    - fix the usage of hugepages flags: we were OR-ing them into the wrong argument (when fixing this the code started failing because hugepages are weird, so for now they are entirely disabled)
    - fix the SFINAE in the segment's factory methods: use, as guard in the signature, just the check of whether the type is an array or not, and do all other checks as static_asserts in the methods. also remove redundant checks.
    
    Reviewed By: beauby
    
    Differential Revision: D23679002
    
    fbshipit-source-id: 1e805d91e17f1afcdb5436522fd4bfe0f4aea925
    lw authored and facebook-github-bot committed Sep 16, 2020
    Copy the full SHA
    51a10c5 View commit details

Commits on Sep 18, 2020

  1. Move channel types from channel::Channel:: to channel::. (#210)

    Summary:
    Pull Request resolved: #210
    
    This is a first step towards introducing templated `Channel`/`Context`
    classes for CPU/CUDA tensors.
    
    Differential Revision: D23598034
    
    Test Plan: Imported from OSS
    
    Reviewed By: lw
    
    Pulled By: beauby
    
    fbshipit-source-id: 3936c56e093119f01250927aafd9074488e50030
    beauby authored and facebook-github-bot committed Sep 18, 2020
    Copy the full SHA
    560dfe6 View commit details

Commits on Sep 19, 2020

  1. Add TENSORPIPE_SUPPORTS_CUDA flag. (#211)

    Summary:
    Pull Request resolved: #211
    
    + Add a generic `TENSORPIPE_SUPPORTS_CUDA` config macro (controlled by the
    `TP_USE_CUDA` CMake flag) intended to indicate support for CUDA
    tensors throughout TensorPipe.
    + Move definition of config macros (`TENSORPIPE_HAS_XXX`) to a
    separate config.h header to avoid circular include
    dependencies (having xxx.h including tensorpipe.h for accessing config
    macros and tensorpipe.h including xxx.h for convenience).
    
    Differential Revision: D23598039
    
    Test Plan: Imported from OSS
    
    Reviewed By: lw
    
    Pulled By: beauby
    
    fbshipit-source-id: 4b3a0c0b329240a10dcf3b88a4d43507cf36c423
    beauby authored and facebook-github-bot committed Sep 19, 2020
    Copy the full SHA
    cecf865 View commit details

Commits on Sep 21, 2020

  1. Fix GCC 7 freaking out on a field initializer (#220)

    Summary:
    Not sure why, but GCC 7 complains if we initialize length to zero. We don't really need to do that (length will only be read when the pointer is non-null, and in that case it's explicitly set) so this fixes the issue.
    
    Pull Request resolved: #220
    
    Reviewed By: beauby
    
    Differential Revision: D23803439
    
    Pulled By: lw
    
    fbshipit-source-id: 39f84b58e4e7ab755740660249e8b74cf462f705
    lw authored and facebook-github-bot committed Sep 21, 2020
    Copy the full SHA
    122efb5 View commit details
  2. Fix CMake build when SHM disabled. (#221)

    Summary:
    Now that `Socket`/`Fd` have been promoted to `common/`, their implementation should be compiled regardless of whether TensorPipe is compiled with the SHM transport, otherwise some linkers will complain about `virtual ~Fd()` being an undefined symbol (cf. https://app.circleci.com/pipelines/github/pytorch/pytorch/216673/workflows/8221802b-16fd-42b2-b801-c105ab39ace1/jobs/7624520).
    In order for the implementation of `Socket` to build on OSX (which does not offer the `SOCK_NONBLOCK` flag for `socket()`), we have to manually set the socket to non-blocking via `fnctl`.
    
    Pull Request resolved: #221
    
    Reviewed By: lw
    
    Differential Revision: D23809663
    
    Pulled By: beauby
    
    fbshipit-source-id: 479e20682432d78ba8683d38ffd8a0bf275ced5d
    Lucas Hosseini authored and facebook-github-bot committed Sep 21, 2020
    Copy the full SHA
    49c0a09 View commit details
  3. Make Channel API accept buffer structs rather than raw pointers. (#45…

    …014)
    
    Summary:
    Pull Request resolved: pytorch/pytorch#45014
    
    Pull Request resolved: #219
    
    Pull Request resolved: #212
    
    + Introduce buffer.h defining the buffer struct(s). The `CpuBuffer`
    struct is always defined, while the `CudaBuffer` struct is defined
    only when `TENSORPIPE_SUPPORTS_CUDA` is true.
    + Update all channels to take a `CpuBuffer` or `CudaBuffer` for
    `send`/`recv` rather than a raw pointer and a length.
    + Make the base `Channel`/`Context` classes templated on `TBuffer`,
    effectively creating two channel hierarchies (one for CPU channels,
    one for CUDA channels).
    + Update the Pipe and the generic channel tests to use the new API. So
    far, generic channel tests are CPU only, and tests for the CUDA IPC
    channel are (temporarily) disabled. A subsequent PR will take care of
    refactoring tests so that generic tests work for CUDA channels. An
    other PR will add support for CUDA tensors in the Pipe.
    
    Differential Revision: D23598033
    
    Test Plan: Imported from OSS
    
    Reviewed By: lw
    
    Pulled By: beauby
    
    fbshipit-source-id: a18ca76e1453f2b5d091a2d0103362818eafa028
    beauby authored and Lucas Hosseini committed Sep 21, 2020
    Copy the full SHA
    9646e1a View commit details
Showing with 3,959 additions and 3,119 deletions.
  1. +5 −4 .circleci/config.yml
  2. +3 −0 .gitmodules
  3. +0 −3 CMakeLists.txt
  4. +0 −581 cmake/FindProtobuf.cmake
  5. +7 −0 cmake/Options.cmake
  6. +0 −127 cmake/ProtobufGenerate.cmake
  7. +0 −73 cmake/SelectLibraryConfigurations.cmake
  8. +0 −6 docs/development.md
  9. +33 −28 tensorpipe/CMakeLists.txt
  10. +2 −2 tensorpipe/benchmark/CMakeLists.txt
  11. +150 −72 tensorpipe/benchmark/benchmark_pipe.cc
  12. +13 −1 tensorpipe/benchmark/options.cc
  13. +2 −0 tensorpipe/benchmark/options.h
  14. +20 −36 tensorpipe/channel/basic/channel.cc
  15. +5 −9 tensorpipe/channel/basic/channel.h
  16. +6 −6 tensorpipe/channel/basic/context.cc
  17. +4 −4 tensorpipe/channel/basic/context.h
  18. +10 −11 tensorpipe/channel/channel.h
  19. +41 −43 tensorpipe/channel/cma/channel.cc
  20. +5 −9 tensorpipe/channel/cma/channel.h
  21. +6 −6 tensorpipe/channel/cma/context.cc
  22. +4 −4 tensorpipe/channel/cma/context.h
  23. +3 −2 tensorpipe/channel/context.h
  24. +22 −0 tensorpipe/channel/cpu_context.h
  25. +22 −0 tensorpipe/channel/cuda_context.h
  26. +567 −0 tensorpipe/channel/cuda_ipc/channel.cc
  27. +64 −0 tensorpipe/channel/cuda_ipc/channel.h
  28. +156 −0 tensorpipe/channel/cuda_ipc/context.cc
  29. +63 −0 tensorpipe/channel/cuda_ipc/context.h
  30. +18 −9 tensorpipe/channel/helpers.cc
  31. +3 −6 tensorpipe/channel/helpers.h
  32. +64 −73 tensorpipe/channel/mpt/channel.cc
  33. +5 −9 tensorpipe/channel/mpt/channel.h
  34. +30 −33 tensorpipe/channel/mpt/context.cc
  35. +4 −4 tensorpipe/channel/mpt/context.h
  36. +47 −0 tensorpipe/channel/mpt/nop_types.h
  37. +1 −1 tensorpipe/channel/registry.cc
  38. +2 −2 tensorpipe/channel/registry.h
  39. +38 −41 tensorpipe/channel/xth/channel.cc
  40. +5 −9 tensorpipe/channel/xth/channel.h
  41. +6 −6 tensorpipe/channel/xth/context.cc
  42. +4 −4 tensorpipe/channel/xth/context.h
  43. +10 −5 tensorpipe/{proto/channel/xth.proto → common/cpu_buffer.h}
  44. +23 −0 tensorpipe/common/cuda_buffer.h
  45. +2 −2 tensorpipe/common/defs.h
  46. +21 −0 tensorpipe/common/error.cc
  47. +36 −0 tensorpipe/common/error.h
  48. +2 −6 tensorpipe/{transport/shm → common}/fd.cc
  49. +4 −14 tensorpipe/{transport/shm → common}/fd.h
  50. +59 −0 tensorpipe/common/memory.h
  51. +234 −0 tensorpipe/common/nop.h
  52. +15 −36 tensorpipe/{transport/shm → common}/socket.cc
  53. +11 −28 tensorpipe/{transport/shm → common}/socket.h
  54. +6 −6 tensorpipe/{proto/channel/cma.proto → config.h.in}
  55. +62 −0 tensorpipe/core/buffer.h
  56. +10 −6 tensorpipe/core/context.cc
  57. +8 −4 tensorpipe/core/context.h
  58. +20 −19 tensorpipe/core/listener.cc
  59. +4 −3 tensorpipe/core/message.h
  60. +118 −0 tensorpipe/core/nop_types.h
  61. +189 −178 tensorpipe/core/pipe.cc
  62. +0 −31 tensorpipe/proto/channel/mpt.proto
  63. +0 −79 tensorpipe/proto/core.proto
  64. +9 −9 tensorpipe/python/tensorpipe.cc
  65. +12 −3 tensorpipe/{tensorpipe.h.in → tensorpipe.h}
  66. +13 −6 tensorpipe/test/CMakeLists.txt
  67. +5 −1 tensorpipe/test/channel/basic/basic_test.cc
  68. +174 −81 tensorpipe/test/channel/channel_test.cc
  69. +58 −17 tensorpipe/test/channel/channel_test.h
  70. +5 −1 tensorpipe/test/channel/cma/cma_test.cc
  71. +208 −0 tensorpipe/test/channel/cuda_ipc/cuda_ipc_test.cc
  72. +22 −0 tensorpipe/test/channel/cuda_ipc/kernel.cu
  73. +15 −0 tensorpipe/test/channel/cuda_ipc/kernel.cuh
  74. +5 −1 tensorpipe/test/channel/mpt/mpt_test.cc
  75. +5 −1 tensorpipe/test/channel/xth/xth_test.cc
  76. +18 −18 tensorpipe/test/core/context_test.cc
  77. +0 −17 tensorpipe/test/proto/core_test.cc
  78. +20 −10 tensorpipe/test/transport/connection_test.cc
  79. +18 −10 tensorpipe/test/transport/shm/connection_test.cc
  80. +1 −0 tensorpipe/test/transport/shm/loop_test.cc
  81. +2 −1 tensorpipe/test/transport/shm/reactor_test.cc
  82. +1 −1 tensorpipe/test/transport/shm/sockaddr_test.cc
  83. +0 −125 tensorpipe/test/util/ringbuffer/protobuf_streams_test.cc
  84. +185 −126 tensorpipe/test/util/ringbuffer/ringbuffer_test.cc
  85. +36 −20 tensorpipe/test/util/ringbuffer/shm_ringbuffer_test.cc
  86. +27 −22 tensorpipe/test/util/shm/segment_test.cc
  87. +13 −14 tensorpipe/transport/connection.cc
  88. +7 −18 tensorpipe/transport/connection.h
  89. +0 −23 tensorpipe/transport/error.cc
  90. +0 −36 tensorpipe/transport/error.h
  91. +195 −226 tensorpipe/transport/shm/connection.cc
  92. +5 −5 tensorpipe/transport/shm/connection.h
  93. +0 −1 tensorpipe/transport/shm/context.cc
  94. +1 −1 tensorpipe/transport/shm/listener.cc
  95. +3 −1 tensorpipe/transport/shm/listener.h
  96. +1 −1 tensorpipe/transport/shm/loop.h
  97. +14 −22 tensorpipe/transport/shm/reactor.cc
  98. +9 −10 tensorpipe/transport/shm/reactor.h
  99. +57 −0 tensorpipe/transport/shm/sockaddr.cc
  100. +51 −0 tensorpipe/transport/shm/sockaddr.h
  101. +9 −4 tensorpipe/transport/uv/sockaddr.h
  102. +112 −113 tensorpipe/util/ringbuffer/consumer.h
  103. +121 −130 tensorpipe/util/ringbuffer/producer.h
  104. +0 −95 tensorpipe/util/ringbuffer/protobuf_streams.h
  105. +55 −68 tensorpipe/util/ringbuffer/ringbuffer.h
  106. +32 −34 tensorpipe/util/ringbuffer/shm.cc
  107. +8 −7 tensorpipe/util/ringbuffer/shm.h
  108. +62 −64 tensorpipe/util/shm/segment.cc
  109. +90 −135 tensorpipe/util/shm/segment.h
  110. +1 −0 third_party/libnop
9 changes: 5 additions & 4 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ jobs:
name: Install apt packages
command: |
apt-get update
apt-get install -y git-core build-essential cmake libprotobuf-dev protobuf-compiler << parameters.apt_get >>
apt-get install -y git-core build-essential cmake << parameters.apt_get >>
- run:
name: Initialize submodules
command: |
@@ -46,6 +46,7 @@ jobs:
-DCMAKE_C_COMPILER=<< parameters.c_compiler >> \
-DCMAKE_CXX_COMPILER=<< parameters.cxx_compiler >> \
-DTP_ENABLE_CMA=OFF \
-DTP_ENABLE_CUDA_IPC=OFF \
-DTP_BUILD_TESTING=ON \
<< parameters.cmake_args >>
make -j<<parameters.nproc>>
@@ -67,7 +68,7 @@ jobs:
- run:
name: Install homebrew packages
command: |
brew install cmake protobuf
brew install cmake
- run:
name: Initialize submodules
command: |
@@ -109,7 +110,7 @@ jobs:
name: Install apt packages
command: |
apt-get update
apt-get install -y git-core build-essential cmake libprotobuf-dev protobuf-compiler python3-dev python3-venv << parameters.apt_get >>
apt-get install -y git-core build-essential cmake python3-dev python3-venv << parameters.apt_get >>
- run:
name: Initialize submodules
command: |
@@ -120,7 +121,7 @@ jobs:
command: |
python3 -m venv venv
source venv/bin/activate
TP_ENABLE_CMA=OFF python3 setup.py install
TP_ENABLE_CMA=OFF TP_ENABLE_CUDA_IPC=OFF python3 setup.py install
- run:
name: Test
command: |
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -8,3 +8,6 @@
[submodule "third_party/googletest"]
path = third_party/googletest
url = https://github.com/google/googletest.git
[submodule "third_party/libnop"]
path = third_party/libnop
url = https://github.com/google/libnop.git
3 changes: 0 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -12,9 +12,6 @@ set(CMAKE_CXX_STANDARD 14)

list(APPEND CMAKE_MODULE_PATH "${PROJECT_SOURCE_DIR}/cmake")

# Enable protobuf_generate() polyfill.
include(cmake/ProtobufGenerate.cmake)

# Expose build options.
include(Options)

581 changes: 0 additions & 581 deletions cmake/FindProtobuf.cmake

This file was deleted.

7 changes: 7 additions & 0 deletions cmake/Options.cmake
Original file line number Diff line number Diff line change
@@ -10,11 +10,18 @@ else()
set(LINUX OFF)
endif()

include(CMakeDependentOption)

# TODO: Default to ON if CUDA available.
option(TP_USE_CUDA "Enable support for CUDA tensors" OFF)

# Transports
option(TP_ENABLE_SHM "Enable shm transport" ${LINUX})

# Channels
option(TP_ENABLE_CMA "Enable cma channel" ${LINUX})
cmake_dependent_option(TP_ENABLE_CUDA_IPC "Enable CUDA IPC channel" ON
"TP_USE_CUDA" OFF)

# Optional features
option(TP_BUILD_BENCHMARK "Build benchmarks" OFF)
127 changes: 0 additions & 127 deletions cmake/ProtobufGenerate.cmake

This file was deleted.

73 changes: 0 additions & 73 deletions cmake/SelectLibraryConfigurations.cmake

This file was deleted.

6 changes: 0 additions & 6 deletions docs/development.md
Original file line number Diff line number Diff line change
@@ -7,9 +7,6 @@ TensorPipe uses CMake for its build system.
To build TensorPipe, you need:

* C++14 compatible compiler (GCC >= 5.5 or Clang >= 6)
* Protobuf version 3 (note: if you have compiled protobuf yourself and
installed it at a non-standard location, please see the note about
`CMAKE_PREFIX_PATH` below).

## Clone the repository

@@ -54,9 +51,6 @@ Useful CMake variables:
* `CMAKE_C_FLAGS` -- Additional flags for the C compiler.
* `CMAKE_CXX_FLAGS` -- Additional flags for the C++ compiler.
* `CMAKE_BUILD_TYPE` -- For example: `release`, `debug`.
* `CMAKE_PREFIX_PATH` -- If you have compiled protobuf yourself and
installed it at a non-standard location, you can use this variable
to make CMake find it.

Useful TensorPipe specific variables:

61 changes: 33 additions & 28 deletions tensorpipe/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -11,25 +11,23 @@ add_library(tensorpipe
channel/registry.cc
common/address.cc
common/error.cc
common/fd.cc
common/socket.cc
common/system.cc
core/context.cc
core/error.cc
core/listener.cc
core/pipe.cc
proto/core.proto
transport/connection.cc
transport/error.cc
transport/registry.cc)

# Support `#include <tensorpipe/foo.h>`.
target_include_directories(tensorpipe PUBLIC $<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}>)

# Support `#include <tensorpipe/tensorpipe.h>` and `#include <tensorpipe/proto/foo.pb.h>`.
# Support `#include <tensorpipe/tensorpipe.h>`.
target_include_directories(tensorpipe PUBLIC $<BUILD_INTERFACE:${PROJECT_BINARY_DIR}>)

# Support `#include "proto/foo.pb.h"`, as generated by protoc.
target_include_directories(tensorpipe PRIVATE ${PROJECT_BINARY_DIR}/tensorpipe)


## Channels

@@ -43,16 +41,14 @@ target_sources(tensorpipe PRIVATE

target_sources(tensorpipe PRIVATE
channel/xth/channel.cc
channel/xth/context.cc
proto/channel/xth.proto)
channel/xth/context.cc)

### cma

if(TP_ENABLE_CMA)
target_sources(tensorpipe PRIVATE
channel/cma/channel.cc
channel/cma/context.cc
proto/channel/cma.proto)
channel/cma/context.cc)
set(TENSORPIPE_HAS_CMA_CHANNEL 1)
else()
set(TENSORPIPE_HAS_CMA_CHANNEL 0)
@@ -62,9 +58,25 @@ endif()

target_sources(tensorpipe PRIVATE
channel/mpt/channel.cc
channel/mpt/context.cc
proto/channel/mpt.proto)
channel/mpt/context.cc)

## CUDA channels

if(TP_USE_CUDA)
find_package(CUDA REQUIRED)
target_link_libraries(tensorpipe PUBLIC ${CUDA_LIBRARIES})
target_include_directories(tensorpipe PUBLIC ${CUDA_INCLUDE_DIRS})
set(TENSORPIPE_SUPPORTS_CUDA 1)

### cuda_ipc

if(TP_ENABLE_CUDA_IPC)
target_sources(tensorpipe PRIVATE
channel/cuda_ipc/channel.cc
channel/cuda_ipc/context.cc)
set(TENSORPIPE_HAS_CUDA_IPC_CHANNEL 1)
endif()
endif()

## Transports

@@ -87,11 +99,10 @@ if(TP_ENABLE_SHM)
target_sources(tensorpipe PRIVATE
transport/shm/context.cc
transport/shm/connection.cc
transport/shm/fd.cc
transport/shm/listener.cc
transport/shm/loop.cc
transport/shm/reactor.cc
transport/shm/socket.cc
transport/shm/sockaddr.cc
util/ringbuffer/shm.cc
util/shm/segment.cc)
set(TENSORPIPE_HAS_SHM_TRANSPORT 1)
@@ -107,28 +118,22 @@ if(APPLE)
endif()


## Protobufs

if(NOT TARGET protobuf::libprotobuf)
if(Protobuf_DIR)
find_package(Protobuf 3 REQUIRED HINTS "${Protobuf_DIR}")
else()
find_package(Protobuf 3 REQUIRED)
endif()
endif()

target_link_libraries(tensorpipe PRIVATE protobuf::libprotobuf)
protobuf_generate(TARGET tensorpipe)


if(BUILD_SHARED_LIBS)
set_target_properties(tensorpipe PROPERTIES POSITION_INDEPENDENT_CODE 1)
endif()


## Config

configure_file(tensorpipe.h.in tensorpipe.h)
configure_file(config.h.in config.h)


## Libnop

# We should keep libnop headers private as they should not be exposed to downstream users,
# but they're currently transitively included by tensorpipe/transport/connection.h (which
# is still unclear whether it should be a public or private header).
target_include_directories(tensorpipe PUBLIC $<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/third_party/libnop/include>)


## Python bindings
4 changes: 2 additions & 2 deletions tensorpipe/benchmark/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@
# TODO: Make those separate CMake projects.

add_executable(benchmark_transport benchmark_transport.cc options.cc)
target_link_libraries(benchmark_transport PRIVATE tensorpipe)
target_link_libraries(benchmark_transport PRIVATE -Wl,--whole-archive tensorpipe -Wl,--no-whole-archive)

add_executable(benchmark_pipe benchmark_pipe.cc options.cc)
target_link_libraries(benchmark_pipe PRIVATE tensorpipe)
target_link_libraries(benchmark_pipe PRIVATE -Wl,--whole-archive tensorpipe -Wl,--no-whole-archive)
222 changes: 150 additions & 72 deletions tensorpipe/benchmark/benchmark_pipe.cc
Original file line number Diff line number Diff line change
@@ -25,12 +25,18 @@ using namespace tensorpipe::benchmark;
Measurements measurements;

struct Data {
std::unique_ptr<uint8_t[]> expectedPayload;
std::unique_ptr<uint8_t[]> temporaryPayload;
size_t numPayloads;
size_t payloadSize;
std::unique_ptr<uint8_t[]> expectedTensor;
std::unique_ptr<uint8_t[]> temporaryTensor;
std::vector<std::unique_ptr<uint8_t[]>> expectedPayload;
std::vector<std::string> expectedPayloadMetadata;
std::vector<std::unique_ptr<uint8_t[]>> temporaryPayload;

size_t numTensors;
size_t tensorSize;
std::vector<std::unique_ptr<uint8_t[]>> expectedTensor;
std::vector<std::string> expectedTensorMetadata;
std::vector<std::unique_ptr<uint8_t[]>> temporaryTensor;

std::string expectedMetadata;
};

@@ -78,16 +84,27 @@ static void serverPongPingNonBlock(
TP_THROW_ASSERT_IF(error) << error.what();
TP_DCHECK_EQ(message.metadata, data.expectedMetadata);
if (data.payloadSize > 0) {
TP_DCHECK_EQ(message.payloads.size(), 1);
TP_DCHECK_EQ(message.payloads[0].length, data.payloadSize);
message.payloads[0].data = data.temporaryPayload.get();
TP_DCHECK_EQ(message.payloads.size(), data.numPayloads);
for (size_t payloadIdx = 0; payloadIdx < data.numPayloads; payloadIdx++) {
TP_DCHECK_EQ(
message.payloads[payloadIdx].metadata,
data.expectedPayloadMetadata[payloadIdx]);
TP_DCHECK_EQ(message.payloads[payloadIdx].length, data.payloadSize);
message.payloads[payloadIdx].data =
data.temporaryPayload[payloadIdx].get();
}
} else {
TP_DCHECK_EQ(message.payloads.size(), 0);
}
if (data.tensorSize > 0) {
TP_DCHECK_EQ(message.tensors.size(), 1);
TP_DCHECK_EQ(message.tensors[0].length, data.tensorSize);
message.tensors[0].data = data.temporaryTensor.get();
TP_DCHECK_EQ(message.tensors.size(), data.numTensors);
for (size_t tensorIdx = 0; tensorIdx < data.numTensors; tensorIdx++) {
TP_DCHECK_EQ(
message.tensors[tensorIdx].metadata,
data.expectedTensorMetadata[tensorIdx]);
TP_DCHECK_EQ(message.tensors[tensorIdx].length, data.tensorSize);
message.tensors[tensorIdx].data = data.temporaryTensor[tensorIdx].get();
}
} else {
TP_DCHECK_EQ(message.tensors.size(), 0);
}
@@ -97,26 +114,33 @@ static void serverPongPingNonBlock(
const Error& error, Message&& message) {
TP_THROW_ASSERT_IF(error) << error.what();
if (data.payloadSize > 0) {
TP_DCHECK_EQ(message.payloads.size(), 1);
TP_DCHECK_EQ(message.payloads[0].length, data.payloadSize);
TP_DCHECK_EQ(
memcmp(
message.payloads[0].data,
data.expectedPayload.get(),
message.payloads[0].length),
0);
TP_DCHECK_EQ(message.payloads.size(), data.numPayloads);
for (size_t payloadIdx = 0; payloadIdx < data.numPayloads;
payloadIdx++) {
TP_DCHECK_EQ(
message.payloads[payloadIdx].length, data.payloadSize);
TP_DCHECK_EQ(
memcmp(
message.payloads[payloadIdx].data,
data.expectedPayload[payloadIdx].get(),
message.payloads[payloadIdx].length),
0);
}
} else {
TP_DCHECK_EQ(message.tensors.size(), 0);
}
if (data.tensorSize > 0) {
TP_DCHECK_EQ(message.tensors.size(), 1);
TP_DCHECK_EQ(message.tensors[0].length, data.tensorSize);
TP_DCHECK_EQ(
memcmp(
message.tensors[0].data,
data.expectedTensor.get(),
message.tensors[0].length),
0);
TP_DCHECK_EQ(message.tensors.size(), data.numTensors);
for (size_t tensorIdx = 0; tensorIdx < data.numTensors;
tensorIdx++) {
TP_DCHECK_EQ(message.tensors[tensorIdx].length, data.tensorSize);
TP_DCHECK_EQ(
memcmp(
message.tensors[tensorIdx].data,
data.expectedTensor[tensorIdx].get(),
message.tensors[tensorIdx].length),
0);
}
} else {
TP_DCHECK_EQ(message.tensors.size(), 0);
}
@@ -140,15 +164,28 @@ static void serverPongPingNonBlock(
static void runServer(const Options& options) {
std::string addr = options.address;
int numRoundTrips = options.numRoundTrips;
Data data = {
createData(options.payloadSize),
std::make_unique<uint8_t[]>(options.payloadSize),
options.payloadSize,
createData(options.tensorSize),
std::make_unique<uint8_t[]>(options.tensorSize),
options.tensorSize,
std::string(options.metadataSize, 0x42),
};

Data data;
data.numPayloads = options.numPayloads;
data.payloadSize = options.payloadSize;
for (size_t payloadIdx = 0; payloadIdx < options.numPayloads; payloadIdx++) {
data.expectedPayload.push_back(createData(options.payloadSize));
data.expectedPayloadMetadata.push_back(
std::string(options.metadataSize, 0x42));
data.temporaryPayload.push_back(
std::make_unique<uint8_t[]>(options.payloadSize));
}
data.numTensors = options.numTensors;
data.tensorSize = options.tensorSize;
for (size_t tensorIdx = 0; tensorIdx < options.numTensors; tensorIdx++) {
data.expectedTensor.push_back(createData(options.tensorSize));
data.expectedTensorMetadata.push_back(
std::string(options.metadataSize, 0x42));
data.temporaryTensor.push_back(
std::make_unique<uint8_t[]>(options.tensorSize));
}
data.expectedMetadata = std::string(options.metadataSize, 0x42);

Measurements measurements;
measurements.reserve(options.numRoundTrips);

@@ -189,18 +226,22 @@ static void clientPingPongNonBlock(
Message message;
message.metadata = data.expectedMetadata;
if (data.payloadSize > 0) {
Message::Payload payload;
payload.data = data.expectedPayload.get();
payload.length = data.payloadSize;
message.payloads.push_back(std::move(payload));
for (size_t payloadIdx = 0; payloadIdx < data.numPayloads; payloadIdx++) {
Message::Payload payload;
payload.data = data.expectedPayload[payloadIdx].get();
payload.length = data.payloadSize;
message.payloads.push_back(std::move(payload));
}
} else {
TP_DCHECK_EQ(message.tensors.size(), 0);
}
if (data.tensorSize > 0) {
Message::Tensor tensor;
tensor.data = data.expectedTensor.get();
tensor.length = data.tensorSize;
message.tensors.push_back(std::move(tensor));
for (size_t tensorIdx = 0; tensorIdx < data.numTensors; tensorIdx++) {
Message::Tensor tensor;
tensor.data = data.expectedTensor[tensorIdx].get();
tensor.length = data.tensorSize;
message.tensors.push_back(std::move(tensor));
}
} else {
TP_DCHECK_EQ(message.tensors.size(), 0);
}
@@ -215,16 +256,32 @@ static void clientPingPongNonBlock(
TP_THROW_ASSERT_IF(error) << error.what();
TP_DCHECK_EQ(message.metadata, data.expectedMetadata);
if (data.payloadSize > 0) {
TP_DCHECK_EQ(message.payloads.size(), 1);
TP_DCHECK_EQ(message.payloads[0].length, data.payloadSize);
message.payloads[0].data = data.temporaryPayload.get();
TP_DCHECK_EQ(message.payloads.size(), data.numPayloads);
for (size_t payloadIdx = 0; payloadIdx < data.numPayloads;
payloadIdx++) {
TP_DCHECK_EQ(
message.payloads[payloadIdx].metadata,
data.expectedPayloadMetadata[payloadIdx]);
TP_DCHECK_EQ(
message.payloads[payloadIdx].length, data.payloadSize);
message.payloads[payloadIdx].data =
data.temporaryPayload[payloadIdx].get();
}
} else {
TP_DCHECK_EQ(message.payloads.size(), 0);
}
if (data.tensorSize > 0) {
TP_DCHECK_EQ(message.tensors.size(), 1);
TP_DCHECK_EQ(message.tensors[0].length, data.tensorSize);
message.tensors[0].data = data.temporaryTensor.get();
TP_DCHECK_EQ(message.tensors.size(), data.numTensors);
for (size_t tensorIdx = 0; tensorIdx < data.numTensors;
tensorIdx++) {
TP_DCHECK_EQ(
message.tensors[tensorIdx].metadata,
data.expectedTensorMetadata[tensorIdx]);
TP_DCHECK_EQ(
message.tensors[tensorIdx].length, data.tensorSize);
message.tensors[tensorIdx].data =
data.temporaryTensor[tensorIdx].get();
}
} else {
TP_DCHECK_EQ(message.tensors.size(), 0);
}
@@ -235,24 +292,30 @@ static void clientPingPongNonBlock(
measurements.markStop();
TP_THROW_ASSERT_IF(error) << error.what();
if (data.payloadSize > 0) {
TP_DCHECK_EQ(message.payloads.size(), 1);
TP_DCHECK_EQ(
memcmp(
message.payloads[0].data,
data.expectedPayload.get(),
message.payloads[0].length),
0);
TP_DCHECK_EQ(message.payloads.size(), data.numPayloads);
for (size_t payloadIdx = 0; payloadIdx < data.numPayloads;
payloadIdx++) {
TP_DCHECK_EQ(
memcmp(
message.payloads[payloadIdx].data,
data.expectedPayload[payloadIdx].get(),
message.payloads[payloadIdx].length),
0);
}
} else {
TP_DCHECK_EQ(message.payloads.size(), 0);
}
if (data.tensorSize > 0) {
TP_DCHECK_EQ(message.tensors.size(), 1);
TP_DCHECK_EQ(
memcmp(
message.tensors[0].data,
data.expectedTensor.get(),
message.tensors[0].length),
0);
TP_DCHECK_EQ(message.tensors.size(), data.numTensors);
for (size_t tensorIdx = 0; tensorIdx < data.numTensors;
tensorIdx++) {
TP_DCHECK_EQ(
memcmp(
message.tensors[tensorIdx].data,
data.expectedTensor[tensorIdx].get(),
message.tensors[tensorIdx].length),
0);
}
} else {
TP_DCHECK_EQ(message.tensors.size(), 0);
}
@@ -272,15 +335,28 @@ static void clientPingPongNonBlock(
static void runClient(const Options& options) {
std::string addr = options.address;
int numRoundTrips = options.numRoundTrips;
Data data = {
createData(options.payloadSize),
std::make_unique<uint8_t[]>(options.payloadSize),
options.payloadSize,
createData(options.tensorSize),
std::make_unique<uint8_t[]>(options.tensorSize),
options.tensorSize,
std::string(options.metadataSize, 0x42),
};

Data data;
data.numPayloads = options.numPayloads;
data.payloadSize = options.payloadSize;
for (size_t payloadIdx = 0; payloadIdx < options.numPayloads; payloadIdx++) {
data.expectedPayload.push_back(createData(options.payloadSize));
data.expectedPayloadMetadata.push_back(
std::string(options.metadataSize, 0x42));
data.temporaryPayload.push_back(
std::make_unique<uint8_t[]>(options.payloadSize));
}
data.numTensors = options.numTensors;
data.tensorSize = options.tensorSize;
for (size_t tensorIdx = 0; tensorIdx < options.numTensors; tensorIdx++) {
data.expectedTensor.push_back(createData(options.tensorSize));
data.expectedTensorMetadata.push_back(
std::string(options.metadataSize, 0x42));
data.temporaryTensor.push_back(
std::make_unique<uint8_t[]>(options.tensorSize));
}
data.expectedMetadata = std::string(options.metadataSize, 0x42);

Measurements measurements;
measurements.reserve(options.numRoundTrips);

@@ -311,7 +387,9 @@ int main(int argc, char** argv) {
std::cout << "channel = " << x.channel << "\n";
std::cout << "address = " << x.address << "\n";
std::cout << "num_round_trips = " << x.numRoundTrips << "\n";
std::cout << "num_payloads = " << x.numPayloads << "\n";
std::cout << "payload_size = " << x.payloadSize << "\n";
std::cout << "num_tensors = " << x.numTensors << "\n";
std::cout << "tensor_size = " << x.tensorSize << "\n";
std::cout << "metadata_size = " << x.metadataSize << "\n";

14 changes: 13 additions & 1 deletion tensorpipe/benchmark/options.cc
Original file line number Diff line number Diff line change
@@ -59,7 +59,9 @@ static void usage(int status, const char* argv0) {
X("--channel=CHANNEL Channel backend [basic]");
X("--address=ADDRESS Address to listen or connect to");
X("--num-round-trips=NUM Number of write/read pairs to perform");
X("--num-payloads=NUM [optional] Number of payloads of each write/read pair");
X("--payload-size=SIZE [optional] Size of payload of each write/read pair");
X("--num-tensors=NUM [optional] Number of tensors of each write/read pair");
X("--tensor-size=SIZE [optional] Size of tensor of each write/read pair");
X("--metadata-size=SIZE [optional] Size of metadata of each write/read pair");

@@ -81,7 +83,7 @@ static void validateOptions(Options options, const char* argv0) {
status = EXIT_FAILURE;
}
if (options.numRoundTrips <= 0) {
fprintf(stderr, "Missing argument: --io-num must be set\n");
fprintf(stderr, "Missing argument: --num-round-trips must be set\n");
status = EXIT_FAILURE;
}
if (status != EXIT_SUCCESS) {
@@ -100,7 +102,9 @@ struct Options parseOptions(int argc, char** argv) {
CHANNEL,
ADDRESS,
NUM_ROUND_TRIPS,
NUM_PAYLOADS,
PAYLOAD_SIZE,
NUM_TENSORS,
TENSOR_SIZE,
METADATA_SIZE,
HELP,
@@ -112,7 +116,9 @@ struct Options parseOptions(int argc, char** argv) {
{"channel", required_argument, &flag, CHANNEL},
{"address", required_argument, &flag, ADDRESS},
{"num-round-trips", required_argument, &flag, NUM_ROUND_TRIPS},
{"num-payloads", required_argument, &flag, NUM_PAYLOADS},
{"payload-size", required_argument, &flag, PAYLOAD_SIZE},
{"num-tensors", required_argument, &flag, NUM_TENSORS},
{"tensor-size", required_argument, &flag, TENSOR_SIZE},
{"metadata-size", required_argument, &flag, METADATA_SIZE},
{"help", no_argument, &flag, HELP},
@@ -148,9 +154,15 @@ struct Options parseOptions(int argc, char** argv) {
case NUM_ROUND_TRIPS:
options.numRoundTrips = atoi(optarg);
break;
case NUM_PAYLOADS:
options.numPayloads = atoi(optarg);
break;
case PAYLOAD_SIZE:
options.payloadSize = atoi(optarg);
break;
case NUM_TENSORS:
options.numTensors = atoi(optarg);
break;
case TENSOR_SIZE:
options.tensorSize = atoi(optarg);
break;
2 changes: 2 additions & 0 deletions tensorpipe/benchmark/options.h
Original file line number Diff line number Diff line change
@@ -22,7 +22,9 @@ struct Options {
std::string channel; // basic
std::string address; // address for listen or connect
int numRoundTrips{0}; // number of write/read pairs
size_t numPayloads{0};
size_t payloadSize{0};
size_t numTensors{0};
size_t tensorSize{0};
size_t metadataSize{0};
};
56 changes: 20 additions & 36 deletions tensorpipe/channel/basic/channel.cc
Original file line number Diff line number Diff line change
@@ -33,16 +33,11 @@ class Channel::Impl : public std::enable_shared_from_this<Channel::Impl> {
void init();

void send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback);

void recv(
TDescriptor descriptor,
void* ptr,
size_t length,
TRecvCallback callback);
void recv(TDescriptor descriptor, CpuBuffer buffer, TRecvCallback callback);

// Tell the channel what its identifier is.
void setId(std::string id);
@@ -53,16 +48,14 @@ class Channel::Impl : public std::enable_shared_from_this<Channel::Impl> {
OnDemandLoop loop_;

void sendFromLoop_(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback);

// Receive memory region from peer.
void recvFromLoop_(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback);

void setIdFromLoop_(std::string id);
@@ -126,32 +119,27 @@ Channel::Impl::Impl(
id_(std::move(id)) {}

void Channel::send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) {
impl_->send(ptr, length, std::move(descriptorCallback), std::move(callback));
impl_->send(buffer, std::move(descriptorCallback), std::move(callback));
}

void Channel::Impl::send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) {
loop_.deferToLoop([this,
ptr,
length,
buffer,
descriptorCallback{std::move(descriptorCallback)},
callback{std::move(callback)}]() mutable {
sendFromLoop_(
ptr, length, std::move(descriptorCallback), std::move(callback));
sendFromLoop_(buffer, std::move(descriptorCallback), std::move(callback));
});
}

// Send memory region to peer.
void Channel::Impl::sendFromLoop_(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) {
TP_DCHECK(loop_.inLoop());
@@ -191,8 +179,8 @@ void Channel::Impl::sendFromLoop_(
TP_VLOG(6) << "Channel " << id_ << " is writing payload (#" << sequenceNumber
<< ")";
connection_->write(
ptr,
length,
buffer.ptr,
buffer.length,
eagerCallbackWrapper_(
[sequenceNumber, callback{std::move(callback)}](Impl& impl) {
TP_VLOG(6) << "Channel " << impl.id_ << " done writing payload (#"
@@ -206,30 +194,26 @@ void Channel::Impl::sendFromLoop_(
// Receive memory region from peer.
void Channel::recv(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback) {
impl_->recv(std::move(descriptor), ptr, length, std::move(callback));
impl_->recv(std::move(descriptor), buffer, std::move(callback));
}

void Channel::Impl::recv(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback) {
loop_.deferToLoop([this,
descriptor{std::move(descriptor)},
ptr,
length,
buffer,
callback{std::move(callback)}]() mutable {
recvFromLoop_(std::move(descriptor), ptr, length, std::move(callback));
recvFromLoop_(std::move(descriptor), buffer, std::move(callback));
});
}

void Channel::Impl::recvFromLoop_(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback) {
TP_DCHECK(loop_.inLoop());

@@ -257,8 +241,8 @@ void Channel::Impl::recvFromLoop_(
TP_VLOG(6) << "Channel " << id_ << " is reading payload (#" << sequenceNumber
<< ")";
connection_->read(
ptr,
length,
buffer.ptr,
buffer.length,
eagerCallbackWrapper_(
[sequenceNumber, callback{std::move(callback)}](
Impl& impl, const void* /* unused */, size_t /* unused */) {
14 changes: 5 additions & 9 deletions tensorpipe/channel/basic/channel.h
Original file line number Diff line number Diff line change
@@ -11,13 +11,13 @@
#include <memory>

#include <tensorpipe/channel/basic/context.h>
#include <tensorpipe/channel/channel.h>
#include <tensorpipe/channel/cpu_context.h>

namespace tensorpipe {
namespace channel {
namespace basic {

class Channel : public channel::Channel {
class Channel : public channel::CpuChannel {
// Use the passkey idiom to allow make_shared to call what should be a private
// constructor. See https://abseil.io/tips/134 for more information.
struct ConstructorToken {};
@@ -31,17 +31,13 @@ class Channel : public channel::Channel {

// Send memory region to peer.
void send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) override;

// Receive memory region from peer.
void recv(
TDescriptor descriptor,
void* ptr,
size_t length,
TRecvCallback callback) override;
void recv(TDescriptor descriptor, CpuBuffer buffer, TRecvCallback callback)
override;

// Tell the channel what its identifier is.
void setId(std::string id) override;
12 changes: 6 additions & 6 deletions tensorpipe/channel/basic/context.cc
Original file line number Diff line number Diff line change
@@ -41,9 +41,9 @@ class Context::Impl : public Context::PrivateIface,

const std::string& domainDescriptor() const;

std::shared_ptr<channel::Channel> createChannel(
std::shared_ptr<channel::CpuChannel> createChannel(
std::shared_ptr<transport::Connection>,
Channel::Endpoint);
Endpoint);

void setId(std::string id);

@@ -88,15 +88,15 @@ const std::string& Context::Impl::domainDescriptor() const {
return domainDescriptor_;
}

std::shared_ptr<channel::Channel> Context::createChannel(
std::shared_ptr<channel::CpuChannel> Context::createChannel(
std::shared_ptr<transport::Connection> connection,
Channel::Endpoint endpoint) {
Endpoint endpoint) {
return impl_->createChannel(std::move(connection), endpoint);
}

std::shared_ptr<channel::Channel> Context::Impl::createChannel(
std::shared_ptr<channel::CpuChannel> Context::Impl::createChannel(
std::shared_ptr<transport::Connection> connection,
Channel::Endpoint /* unused */) {
Endpoint /* unused */) {
std::string channelId = id_ + ".c" + std::to_string(channelCounter_++);
TP_VLOG(4) << "Channel context " << id_ << " is opening channel "
<< channelId;
8 changes: 4 additions & 4 deletions tensorpipe/channel/basic/context.h
Original file line number Diff line number Diff line change
@@ -11,22 +11,22 @@
#include <memory>
#include <string>

#include <tensorpipe/channel/context.h>
#include <tensorpipe/channel/cpu_context.h>
#include <tensorpipe/common/callback.h>

namespace tensorpipe {
namespace channel {
namespace basic {

class Context : public channel::Context {
class Context : public channel::CpuContext {
public:
Context();

const std::string& domainDescriptor() const override;

std::shared_ptr<Channel> createChannel(
std::shared_ptr<CpuChannel> createChannel(
std::shared_ptr<transport::Connection>,
Channel::Endpoint) override;
Endpoint) override;

void setId(std::string id) override;

21 changes: 10 additions & 11 deletions tensorpipe/channel/channel.h
Original file line number Diff line number Diff line change
@@ -45,28 +45,27 @@
namespace tensorpipe {
namespace channel {

enum class Endpoint : bool { kConnect, kListen };

using TDescriptor = std::string;
using TDescriptorCallback = std::function<void(const Error&, TDescriptor)>;
using TSendCallback = std::function<void(const Error&)>;
using TRecvCallback = std::function<void(const Error&)>;

// Abstract base class for channel classes.
template <typename TBuffer>
class Channel {
public:
using TDescriptor = std::string;
using TDescriptorCallback = std::function<void(const Error&, TDescriptor)>;
using TSendCallback = std::function<void(const Error&)>;
using TRecvCallback = std::function<void(const Error&)>;

enum class Endpoint : bool { kConnect, kListen };

// Send memory region to peer.
virtual void send(
const void* ptr,
size_t length,
TBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) = 0;

// Receive memory region from peer.
virtual void recv(
TDescriptor descriptor,
void* ptr,
size_t length,
TBuffer buffer,
TRecvCallback callback) = 0;

// Tell the channel what its identifier is.
84 changes: 41 additions & 43 deletions tensorpipe/channel/cma/channel.cc
Original file line number Diff line number Diff line change
@@ -17,6 +17,9 @@
#include <list>
#include <mutex>

#include <nop/serializer.h>
#include <nop/structure.h>

#include <tensorpipe/channel/error.h>
#include <tensorpipe/channel/helpers.h>
#include <tensorpipe/common/callback.h>
@@ -26,12 +29,21 @@
#include <tensorpipe/common/optional.h>
#include <tensorpipe/common/queue.h>
#include <tensorpipe/common/system.h>
#include <tensorpipe/proto/channel/cma.pb.h>

namespace tensorpipe {
namespace channel {
namespace cma {

namespace {

struct Descriptor {
uint32_t pid;
uint64_t ptr;
NOP_STRUCTURE(Descriptor, pid, ptr);
};

} // namespace

class Channel::Impl : public std::enable_shared_from_this<Channel::Impl> {
public:
Impl(
@@ -43,16 +55,11 @@ class Channel::Impl : public std::enable_shared_from_this<Channel::Impl> {
void init();

void send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback);

void recv(
TDescriptor descriptor,
void* ptr,
size_t length,
TRecvCallback callback);
void recv(TDescriptor descriptor, CpuBuffer buffer, TRecvCallback callback);

// Tell the channel what its identifier is.
void setId(std::string id);
@@ -66,16 +73,14 @@ class Channel::Impl : public std::enable_shared_from_this<Channel::Impl> {

// Send memory region to peer.
void sendFromLoop_(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback);

// Receive memory region from peer.
void recvFromLoop_(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback);

void setIdFromLoop_(std::string id);
@@ -146,31 +151,26 @@ void Channel::Impl::initFromLoop_() {
}

void Channel::send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) {
impl_->send(ptr, length, std::move(descriptorCallback), std::move(callback));
impl_->send(buffer, std::move(descriptorCallback), std::move(callback));
}

void Channel::Impl::send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) {
loop_.deferToLoop([this,
ptr,
length,
buffer,
descriptorCallback{std::move(descriptorCallback)},
callback{std::move(callback)}]() mutable {
sendFromLoop_(
ptr, length, std::move(descriptorCallback), std::move(callback));
sendFromLoop_(buffer, std::move(descriptorCallback), std::move(callback));
});
}

void Channel::Impl::sendFromLoop_(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) {
TP_DCHECK(loop_.inLoop());
@@ -221,40 +221,37 @@ void Channel::Impl::sendFromLoop_(
callback(impl.error_);
}));

proto::Descriptor pbDescriptor;
pbDescriptor.set_pid(getpid());
pbDescriptor.set_ptr(reinterpret_cast<uint64_t>(ptr));
NopHolder<Descriptor> nopHolder;
Descriptor& nopDescriptor = nopHolder.getObject();
nopDescriptor.pid = getpid();
nopDescriptor.ptr = reinterpret_cast<uint64_t>(buffer.ptr);

descriptorCallback(Error::kSuccess, saveDescriptor(pbDescriptor));
descriptorCallback(Error::kSuccess, saveDescriptor(nopHolder));
}

// Receive memory region from peer.
void Channel::recv(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback) {
impl_->recv(std::move(descriptor), ptr, length, std::move(callback));
impl_->recv(std::move(descriptor), buffer, std::move(callback));
}

void Channel::Impl::recv(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback) {
loop_.deferToLoop([this,
descriptor{std::move(descriptor)},
ptr,
length,
buffer,
callback{std::move(callback)}]() mutable {
recvFromLoop_(std::move(descriptor), ptr, length, std::move(callback));
recvFromLoop_(std::move(descriptor), buffer, std::move(callback));
});
}

void Channel::Impl::recvFromLoop_(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback) {
TP_DCHECK(loop_.inLoop());

@@ -277,18 +274,19 @@ void Channel::Impl::recvFromLoop_(
return;
}

proto::Descriptor pbDescriptor;
loadDescriptor(pbDescriptor, descriptor);
pid_t remotePid = pbDescriptor.pid();
void* remotePtr = reinterpret_cast<void*>(pbDescriptor.ptr());
NopHolder<Descriptor> nopHolder;
loadDescriptor(nopHolder, descriptor);
Descriptor& nopDescriptor = nopHolder.getObject();
pid_t remotePid = nopDescriptor.pid;
void* remotePtr = reinterpret_cast<void*>(nopDescriptor.ptr);

TP_VLOG(6) << "Channel " << id_ << " is copying payload (#" << sequenceNumber
<< ")";
context_->requestCopy(
remotePid,
remotePtr,
ptr,
length,
buffer.ptr,
buffer.length,
eagerCallbackWrapper_([sequenceNumber,
callback{std::move(callback)}](Impl& impl) {
TP_VLOG(6) << "Channel " << impl.id_ << " done copying payload (#"
14 changes: 5 additions & 9 deletions tensorpipe/channel/cma/channel.h
Original file line number Diff line number Diff line change
@@ -10,14 +10,14 @@

#include <memory>

#include <tensorpipe/channel/channel.h>
#include <tensorpipe/channel/cma/context.h>
#include <tensorpipe/channel/cpu_context.h>

namespace tensorpipe {
namespace channel {
namespace cma {

class Channel : public channel::Channel {
class Channel : public channel::CpuChannel {
// Use the passkey idiom to allow make_shared to call what should be a private
// constructor. See https://abseil.io/tips/134 for more information.
struct ConstructorToken {};
@@ -31,17 +31,13 @@ class Channel : public channel::Channel {

// Send memory region to peer.
void send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) override;

// Receive memory region from peer.
void recv(
TDescriptor descriptor,
void* ptr,
size_t length,
TRecvCallback callback) override;
void recv(TDescriptor descriptor, CpuBuffer buffer, TRecvCallback callback)
override;

// Tell the channel what its identifier is.
void setId(std::string id) override;
12 changes: 6 additions & 6 deletions tensorpipe/channel/cma/context.cc
Original file line number Diff line number Diff line change
@@ -73,9 +73,9 @@ class Context::Impl : public Context::PrivateIface,

const std::string& domainDescriptor() const;

std::shared_ptr<channel::Channel> createChannel(
std::shared_ptr<channel::CpuChannel> createChannel(
std::shared_ptr<transport::Connection>,
Channel::Endpoint);
Endpoint);

void setId(std::string id);

@@ -193,15 +193,15 @@ const std::string& Context::Impl::domainDescriptor() const {
return domainDescriptor_;
}

std::shared_ptr<channel::Channel> Context::createChannel(
std::shared_ptr<channel::CpuChannel> Context::createChannel(
std::shared_ptr<transport::Connection> connection,
Channel::Endpoint endpoint) {
Endpoint endpoint) {
return impl_->createChannel(std::move(connection), endpoint);
}

std::shared_ptr<channel::Channel> Context::Impl::createChannel(
std::shared_ptr<channel::CpuChannel> Context::Impl::createChannel(
std::shared_ptr<transport::Connection> connection,
Channel::Endpoint /* unused */) {
Endpoint /* unused */) {
TP_THROW_ASSERT_IF(joined_);
std::string channelId = id_ + ".c" + std::to_string(channelCounter_++);
TP_VLOG(4) << "Channel context " << id_ << " is opening channel "
8 changes: 4 additions & 4 deletions tensorpipe/channel/cma/context.h
Original file line number Diff line number Diff line change
@@ -12,23 +12,23 @@
#include <memory>
#include <string>

#include <tensorpipe/channel/context.h>
#include <tensorpipe/channel/cpu_context.h>
#include <tensorpipe/common/callback.h>
#include <tensorpipe/common/error.h>

namespace tensorpipe {
namespace channel {
namespace cma {

class Context : public channel::Context {
class Context : public channel::CpuContext {
public:
Context();

const std::string& domainDescriptor() const override;

std::shared_ptr<Channel> createChannel(
std::shared_ptr<CpuChannel> createChannel(
std::shared_ptr<transport::Connection>,
Channel::Endpoint) override;
Endpoint) override;

void setId(std::string id) override;

5 changes: 3 additions & 2 deletions tensorpipe/channel/context.h
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ namespace channel {
// context. All registered instances are assumed to be eligible
// channels for all pairs.
//
template <typename TBuffer>
class Context {
public:
// Return string to describe the domain for this channel.
@@ -42,9 +43,9 @@ class Context {
// initialized yet, take care to queue these operations to execute
// as soon as initialization has completed.
//
virtual std::shared_ptr<Channel> createChannel(
virtual std::shared_ptr<Channel<TBuffer>> createChannel(
std::shared_ptr<transport::Connection>,
Channel::Endpoint) = 0;
Endpoint) = 0;

// Tell the context what its identifier is.
//
22 changes: 22 additions & 0 deletions tensorpipe/channel/cpu_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <tensorpipe/channel/channel.h>
#include <tensorpipe/channel/context.h>
#include <tensorpipe/common/cpu_buffer.h>

namespace tensorpipe {
namespace channel {

using CpuChannel = Channel<CpuBuffer>;
using CpuContext = Context<CpuBuffer>;

} // namespace channel
} // namespace tensorpipe
22 changes: 22 additions & 0 deletions tensorpipe/channel/cuda_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <tensorpipe/channel/channel.h>
#include <tensorpipe/channel/context.h>
#include <tensorpipe/common/cuda_buffer.h>

namespace tensorpipe {
namespace channel {

using CudaChannel = Channel<CudaBuffer>;
using CudaContext = Context<CudaBuffer>;

} // namespace channel
} // namespace tensorpipe
567 changes: 567 additions & 0 deletions tensorpipe/channel/cuda_ipc/channel.cc

Large diffs are not rendered by default.

64 changes: 64 additions & 0 deletions tensorpipe/channel/cuda_ipc/channel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <memory>

#include <cuda_runtime.h>

#include <tensorpipe/channel/cuda_context.h>
#include <tensorpipe/channel/cuda_ipc/context.h>

namespace tensorpipe {
namespace channel {
namespace cuda_ipc {

class Channel : public channel::CudaChannel {
// Use the passkey idiom to allow make_shared to call what should be a private
// constructor. See https://abseil.io/tips/134 for more information.
struct ConstructorToken {};

public:
Channel(
ConstructorToken,
std::shared_ptr<Context::PrivateIface>,
std::shared_ptr<transport::Connection> connection,
std::string id);

// Send memory region to peer.
void send(
CudaBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) override;

// Receive memory region from peer.
void recv(TDescriptor descriptor, CudaBuffer buffer, TRecvCallback callback)
override;

// Tell the channel what its identifier is.
void setId(std::string id) override;

void close() override;

~Channel() override;

private:
class Impl;

// Using a shared_ptr allows us to detach the lifetime of the implementation
// from the public object's one and perform the destruction asynchronously.
std::shared_ptr<Impl> impl_;

// Allow context to access constructor token.
friend class Context;
};

} // namespace cuda_ipc
} // namespace channel
} // namespace tensorpipe
156 changes: 156 additions & 0 deletions tensorpipe/channel/cuda_ipc/context.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#include <tensorpipe/channel/cuda_ipc/context.h>

#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>

#include <algorithm>
#include <limits>
#include <list>

#include <tensorpipe/channel/cuda_ipc/channel.h>
#include <tensorpipe/channel/error.h>
#include <tensorpipe/channel/helpers.h>
#include <tensorpipe/channel/registry.h>
#include <tensorpipe/common/callback.h>
#include <tensorpipe/common/defs.h>
#include <tensorpipe/common/error_macros.h>
#include <tensorpipe/common/optional.h>
#include <tensorpipe/common/queue.h>
#include <tensorpipe/common/system.h>

namespace tensorpipe {
namespace channel {
namespace cuda_ipc {

namespace {

std::string generateDomainDescriptor() {
std::ostringstream oss;
auto bootID = getBootID();
TP_THROW_ASSERT_IF(!bootID) << "Unable to read boot_id";
oss << bootID.value();
return oss.str();
}

} // namespace

class Context::Impl : public Context::PrivateIface,
public std::enable_shared_from_this<Context::Impl> {
public:
Impl();

const std::string& domainDescriptor() const;

std::shared_ptr<channel::CudaChannel> createChannel(
std::shared_ptr<transport::Connection>,
Endpoint);

void setId(std::string id);

ClosingEmitter& getClosingEmitter() override;

using copy_request_callback_fn = std::function<void(const Error&)>;

void close();

void join();

~Impl() override = default;

private:
std::string domainDescriptor_;
std::atomic<bool> closed_{false};
std::atomic<bool> joined_{false};
ClosingEmitter closingEmitter_;

// An identifier for the context, composed of the identifier for the context,
// combined with the channel's name. It will only be used for logging and
// debugging purposes.
std::string id_{"N/A"};

// Sequence numbers for the channels created by this context, used to create
// their identifiers based off this context's identifier. They will only be
// used for logging and debugging.
std::atomic<uint64_t> channelCounter_{0};
};

Context::Context() : impl_(std::make_shared<Context::Impl>()) {}

Context::Impl::Impl() : domainDescriptor_(generateDomainDescriptor()) {}

void Context::close() {
impl_->close();
}

void Context::Impl::close() {
if (!closed_.exchange(true)) {
closingEmitter_.close();
}
}

void Context::join() {
impl_->join();
}

void Context::Impl::join() {
close();

joined_.exchange(true);
}

Context::~Context() {
join();
}

void Context::setId(std::string id) {
impl_->setId(std::move(id));
}

void Context::Impl::setId(std::string id) {
id_ = std::move(id);
}

ClosingEmitter& Context::Impl::getClosingEmitter() {
return closingEmitter_;
}

const std::string& Context::domainDescriptor() const {
return impl_->domainDescriptor();
}

const std::string& Context::Impl::domainDescriptor() const {
return domainDescriptor_;
}

std::shared_ptr<channel::CudaChannel> Context::createChannel(
std::shared_ptr<transport::Connection> connection,
Endpoint endpoint) {
return impl_->createChannel(std::move(connection), endpoint);
}

std::shared_ptr<channel::CudaChannel> Context::Impl::createChannel(
std::shared_ptr<transport::Connection> connection,
Endpoint /* unused */) {
TP_THROW_ASSERT_IF(joined_);
std::string channelId = id_ + ".c" + std::to_string(channelCounter_++);
TP_VLOG(4) << "Channel context " << id_ << " is opening channel "
<< channelId;
return std::make_shared<Channel>(
Channel::ConstructorToken(),
std::static_pointer_cast<PrivateIface>(shared_from_this()),
std::move(connection),
std::move(channelId));
}

} // namespace cuda_ipc
} // namespace channel
} // namespace tensorpipe
63 changes: 63 additions & 0 deletions tensorpipe/channel/cuda_ipc/context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <functional>
#include <memory>
#include <string>

#include <tensorpipe/channel/cuda_context.h>
#include <tensorpipe/common/callback.h>
#include <tensorpipe/common/error.h>

namespace tensorpipe {
namespace channel {
namespace cuda_ipc {

class Context : public channel::CudaContext {
public:
Context();

const std::string& domainDescriptor() const override;

std::shared_ptr<CudaChannel> createChannel(
std::shared_ptr<transport::Connection>,
Endpoint) override;

void setId(std::string id) override;

void close() override;

void join() override;

~Context() override;

private:
class PrivateIface {
public:
virtual ClosingEmitter& getClosingEmitter() = 0;

virtual ~PrivateIface() = default;
};

class Impl;

// The implementation is managed by a shared_ptr because each child object
// will also hold a shared_ptr to it (downcast as a shared_ptr to the private
// interface). However, its lifetime is tied to the one of this public object,
// since when the latter is destroyed the implementation is closed and joined.
std::shared_ptr<Impl> impl_;

// Allow channel to see the private interface.
friend class Channel;
};

} // namespace cuda_ipc
} // namespace channel
} // namespace tensorpipe
27 changes: 18 additions & 9 deletions tensorpipe/channel/helpers.cc
Original file line number Diff line number Diff line change
@@ -9,22 +9,31 @@
#include <tensorpipe/channel/helpers.h>

#include <tensorpipe/common/defs.h>
#include <tensorpipe/common/nop.h>

namespace tensorpipe {
namespace channel {

Channel::TDescriptor saveDescriptor(const google::protobuf::MessageLite& pb) {
Channel::TDescriptor out;
const auto success = pb.SerializeToString(&out);
TP_DCHECK(success) << "Failed to serialize protobuf message";
TDescriptor saveDescriptor(const AbstractNopHolder& object) {
const size_t len = object.getSize();
TDescriptor out(len, '\0');
NopWriter writer(
const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(out.data())), len);

nop::Status<void> status = object.write(writer);
TP_THROW_ASSERT_IF(status.has_error())
<< "Error saving descriptor: " << status.GetErrorMessage();

return out;
}

void loadDescriptor(
google::protobuf::MessageLite& pb,
const Channel::TDescriptor& in) {
const auto success = pb.ParseFromString(in);
TP_DCHECK(success) << "Failed to parse protobuf message";
void loadDescriptor(AbstractNopHolder& object, const TDescriptor& in) {
const size_t len = in.size();
NopReader reader(reinterpret_cast<const uint8_t*>(in.data()), len);

nop::Status<void> status = object.read(reader);
TP_THROW_ASSERT_IF(status.has_error())
<< "Error loading descriptor: " << status.GetErrorMessage();
}

} // namespace channel
9 changes: 3 additions & 6 deletions tensorpipe/channel/helpers.h
Original file line number Diff line number Diff line change
@@ -10,18 +10,15 @@

// Note: never include this file from headers!

#include <google/protobuf/message_lite.h>

#include <tensorpipe/channel/channel.h>
#include <tensorpipe/common/nop.h>

namespace tensorpipe {
namespace channel {

Channel::TDescriptor saveDescriptor(const google::protobuf::MessageLite& pb);
TDescriptor saveDescriptor(const AbstractNopHolder& object);

void loadDescriptor(
google::protobuf::MessageLite& pb,
const Channel::TDescriptor& in);
void loadDescriptor(AbstractNopHolder& object, const TDescriptor& in);

} // namespace channel
} // namespace tensorpipe
137 changes: 64 additions & 73 deletions tensorpipe/channel/mpt/channel.cc
Original file line number Diff line number Diff line change
@@ -13,10 +13,10 @@

#include <tensorpipe/channel/error.h>
#include <tensorpipe/channel/helpers.h>
#include <tensorpipe/channel/mpt/nop_types.h>
#include <tensorpipe/common/callback.h>
#include <tensorpipe/common/defs.h>
#include <tensorpipe/common/error_macros.h>
#include <tensorpipe/proto/channel/mpt.pb.h>
#include <tensorpipe/transport/context.h>
#include <tensorpipe/transport/error.h>
#include <tensorpipe/transport/listener.h>
@@ -33,7 +33,7 @@ struct SendOperation {
const void* ptr;
size_t length;
int64_t numChunksBeingWritten{0};
Channel::TSendCallback callback;
TSendCallback callback;
};

// State capturing a single recv operation.
@@ -42,7 +42,7 @@ struct RecvOperation {
void* ptr;
size_t length;
int64_t numChunksBeingRead{0};
Channel::TRecvCallback callback;
TRecvCallback callback;
};

} // namespace
@@ -60,16 +60,11 @@ class Channel::Impl : public std::enable_shared_from_this<Channel::Impl> {
void init();

void send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback);

void recv(
TDescriptor descriptor,
void* ptr,
size_t length,
TRecvCallback callback);
void recv(TDescriptor descriptor, CpuBuffer buffer, TRecvCallback callback);

// Tell the channel what its identifier is.
void setId(std::string id);
@@ -87,23 +82,21 @@ class Channel::Impl : public std::enable_shared_from_this<Channel::Impl> {
void initFromLoop_();

void sendFromLoop_(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback);

void recvFromLoop_(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback);

void setIdFromLoop_(std::string id);

void closeFromLoop_();

// Called when client reads the server's hello on backbone connection
void onClientReadHelloOnConnection_(const proto::Packet& pbPacketIn);
void onClientReadHelloOnConnection_(const Packet& nopPacketIn);

// Called when server accepts new client connection for lane
void onServerAcceptOfLane_(
@@ -209,23 +202,27 @@ void Channel::Impl::initFromLoop_() {
TP_DCHECK_EQ(state_, UNINITIALIZED);
if (endpoint_ == Endpoint::kConnect) {
state_ = CLIENT_READING_HELLO;
auto packet = std::make_shared<proto::Packet>();
TP_VLOG(6) << "Channel " << id_ << " reading proto (server hello)";
connection_->read(*packet, lazyCallbackWrapper_([packet](Impl& impl) {
TP_VLOG(6) << "Channel " << impl.id_
<< " done reading proto (server hello)";
impl.onClientReadHelloOnConnection_(*packet);
}));
auto nopHolderIn = std::make_shared<NopHolder<Packet>>();
TP_VLOG(6) << "Channel " << id_ << " reading nop object (server hello)";
connection_->read(
*nopHolderIn, lazyCallbackWrapper_([nopHolderIn](Impl& impl) {
TP_VLOG(6) << "Channel " << impl.id_
<< " done reading nop object (server hello)";
impl.onClientReadHelloOnConnection_(nopHolderIn->getObject());
}));
} else if (endpoint_ == Endpoint::kListen) {
state_ = SERVER_ACCEPTING_LANES;
const std::vector<std::string>& addresses = context_->addresses();
TP_DCHECK_EQ(addresses.size(), numLanes_);
auto packet = std::make_shared<proto::Packet>();
proto::ServerHello* pbServerHello = packet->mutable_server_hello();
auto nopHolderOut = std::make_shared<NopHolder<Packet>>();
Packet& nopPacket = nopHolderOut->getObject();
nopPacket.Become(nopPacket.index_of<ServerHello>());
ServerHello& nopServerHello = *nopPacket.get<ServerHello>();
for (uint64_t laneIdx = 0; laneIdx < numLanes_; ++laneIdx) {
proto::LaneAdvertisement* pbLaneAdvertisement =
pbServerHello->add_lane_advertisements();
pbLaneAdvertisement->set_address(addresses[laneIdx]);
nopServerHello.laneAdvertisements.emplace_back();
LaneAdvertisement& nopLaneAdvertisement =
nopServerHello.laneAdvertisements.back();
nopLaneAdvertisement.address = addresses[laneIdx];
TP_VLOG(6) << "Channel " << id_ << " requesting connection (for lane "
<< laneIdx << ")";
uint64_t token = context_->registerConnectionRequest(
@@ -240,45 +237,41 @@ void Channel::Impl::initFromLoop_() {
impl.onServerAcceptOfLane_(laneIdx, std::move(connection));
}));
laneRegistrationIds_.emplace(laneIdx, token);
pbLaneAdvertisement->set_registration_id(token);
nopLaneAdvertisement.registrationId = token;
numLanesBeingAccepted_++;
}
TP_VLOG(6) << "Channel " << id_ << " writing proto (server hello)";
connection_->write(*packet, lazyCallbackWrapper_([packet](Impl& impl) {
TP_VLOG(6) << "Channel " << impl.id_
<< " done writing proto (server hello)";
}));
TP_VLOG(6) << "Channel " << id_ << " writing nop object (server hello)";
connection_->write(
*nopHolderOut, lazyCallbackWrapper_([nopHolderOut](Impl& impl) {
TP_VLOG(6) << "Channel " << impl.id_
<< " done writing nop object (server hello)";
}));
} else {
TP_THROW_ASSERT() << "unknown endpoint";
}
}

void Channel::send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) {
impl_->send(ptr, length, std::move(descriptorCallback), std::move(callback));
impl_->send(buffer, std::move(descriptorCallback), std::move(callback));
}

void Channel::Impl::send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) {
loop_.deferToLoop([this,
ptr,
length,
buffer,
descriptorCallback{std::move(descriptorCallback)},
callback{std::move(callback)}]() mutable {
sendFromLoop_(
ptr, length, std::move(descriptorCallback), std::move(callback));
sendFromLoop_(buffer, std::move(descriptorCallback), std::move(callback));
});
}

void Channel::Impl::sendFromLoop_(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) {
TP_DCHECK(loop_.inLoop());
@@ -318,8 +311,8 @@ void Channel::Impl::sendFromLoop_(
sendOperations_.emplace_back();
SendOperation& op = sendOperations_.back();
op.sequenceNumber = sequenceNumber;
op.ptr = ptr;
op.length = length;
op.ptr = buffer.ptr;
op.length = buffer.length;
op.callback = std::move(callback);

if (state_ == ESTABLISHED) {
@@ -331,30 +324,26 @@ void Channel::Impl::sendFromLoop_(

void Channel::recv(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback) {
impl_->recv(std::move(descriptor), ptr, length, std::move(callback));
impl_->recv(std::move(descriptor), buffer, std::move(callback));
}

void Channel::Impl::recv(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback) {
loop_.deferToLoop([this,
descriptor{std::move(descriptor)},
ptr,
length,
buffer,
callback{std::move(callback)}]() mutable {
recvFromLoop_(std::move(descriptor), ptr, length, std::move(callback));
recvFromLoop_(std::move(descriptor), buffer, std::move(callback));
});
}

void Channel::Impl::recvFromLoop_(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback) {
TP_DCHECK(loop_.inLoop());

@@ -382,8 +371,8 @@ void Channel::Impl::recvFromLoop_(
recvOperations_.emplace_back();
RecvOperation& op = recvOperations_.back();
op.sequenceNumber = sequenceNumber;
op.ptr = ptr;
op.length = length;
op.ptr = buffer.ptr;
op.length = buffer.length;
op.callback = std::move(callback);

if (state_ == ESTABLISHED) {
@@ -406,29 +395,31 @@ void Channel::Impl::setIdFromLoop_(std::string id) {
id_ = std::move(id);
}

void Channel::Impl::onClientReadHelloOnConnection_(
const proto::Packet& pbPacketIn) {
void Channel::Impl::onClientReadHelloOnConnection_(const Packet& nopPacketIn) {
TP_DCHECK(loop_.inLoop());
TP_DCHECK_EQ(state_, CLIENT_READING_HELLO);
TP_DCHECK_EQ(pbPacketIn.type_case(), proto::Packet::kServerHello);
TP_DCHECK_EQ(nopPacketIn.index(), nopPacketIn.index_of<ServerHello>());

const proto::ServerHello& pbServerHello = pbPacketIn.server_hello();
TP_DCHECK_EQ(pbServerHello.lane_advertisements().size(), numLanes_);
const ServerHello& nopServerHello = *nopPacketIn.get<ServerHello>();
TP_DCHECK_EQ(nopServerHello.laneAdvertisements.size(), numLanes_);
lanes_.resize(numLanes_);
for (uint64_t laneIdx = 0; laneIdx < numLanes_; ++laneIdx) {
const proto::LaneAdvertisement& pbLaneAdvertisement =
pbServerHello.lane_advertisements().Get(laneIdx);
const LaneAdvertisement& nopLaneAdvertisement =
nopServerHello.laneAdvertisements[laneIdx];
std::shared_ptr<transport::Connection> lane =
context_->connect(laneIdx, pbLaneAdvertisement.address());
auto pbPacketOut = std::make_shared<proto::Packet>();
proto::ClientHello* pbClientHello = pbPacketOut->mutable_client_hello();
pbClientHello->set_registration_id(pbLaneAdvertisement.registration_id());
TP_VLOG(6) << "Channel " << id_ << " writing proto (client hello) on lane "
<< laneIdx;
context_->connect(laneIdx, nopLaneAdvertisement.address);
auto nopHolderOut = std::make_shared<NopHolder<Packet>>();
Packet& nopPacket = nopHolderOut->getObject();
nopPacket.Become(nopPacket.index_of<ClientHello>());
ClientHello& nopClientHello = *nopPacket.get<ClientHello>();
nopClientHello.registrationId = nopLaneAdvertisement.registrationId;
TP_VLOG(6) << "Channel " << id_
<< " writing nop object (client hello) on lane " << laneIdx;
lane->write(
*pbPacketOut, lazyCallbackWrapper_([laneIdx, pbPacketOut](Impl& impl) {
*nopHolderOut,
lazyCallbackWrapper_([laneIdx, nopHolderOut](Impl& impl) {
TP_VLOG(6) << "Channel " << impl.id_
<< " done writing proto (client hello) on lane "
<< " done writing nop object (client hello) on lane "
<< laneIdx;
}));
lanes_[laneIdx] = std::move(lane);
14 changes: 5 additions & 9 deletions tensorpipe/channel/mpt/channel.h
Original file line number Diff line number Diff line change
@@ -11,14 +11,14 @@
#include <deque>
#include <list>

#include <tensorpipe/channel/channel.h>
#include <tensorpipe/channel/cpu_context.h>
#include <tensorpipe/channel/mpt/context.h>

namespace tensorpipe {
namespace channel {
namespace mpt {

class Channel : public channel::Channel {
class Channel : public channel::CpuChannel {
// Use the passkey idiom to allow make_shared to call what should be a private
// constructor. See https://abseil.io/tips/134 for more information.
struct ConstructorToken {};
@@ -34,17 +34,13 @@ class Channel : public channel::Channel {

// Send memory region to peer.
void send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) override;

// Receive memory region from peer.
void recv(
TDescriptor descriptor,
void* ptr,
size_t length,
TRecvCallback callback) override;
void recv(TDescriptor descriptor, CpuBuffer buffer, TRecvCallback callback)
override;

// Tell the channel what its identifier is.
void setId(std::string id) override;
63 changes: 30 additions & 33 deletions tensorpipe/channel/mpt/context.cc
Original file line number Diff line number Diff line change
@@ -13,15 +13,14 @@
#include <unordered_map>
#include <unordered_set>

#include <tensorpipe/channel/channel.h>
#include <tensorpipe/channel/error.h>
#include <tensorpipe/channel/helpers.h>
#include <tensorpipe/channel/mpt/channel.h>
#include <tensorpipe/channel/mpt/nop_types.h>
#include <tensorpipe/channel/registry.h>
#include <tensorpipe/common/callback.h>
#include <tensorpipe/common/defs.h>
#include <tensorpipe/common/error_macros.h>
#include <tensorpipe/proto/channel/mpt.pb.h>
#include <tensorpipe/transport/context.h>
#include <tensorpipe/transport/error.h>
#include <tensorpipe/transport/listener.h>
@@ -52,9 +51,9 @@ class Context::Impl : public Context::PrivateIface,

const std::string& domainDescriptor() const;

std::shared_ptr<channel::Channel> createChannel(
std::shared_ptr<channel::CpuChannel> createChannel(
std::shared_ptr<transport::Connection>,
Channel::Endpoint);
Endpoint);

ClosingEmitter& getClosingEmitter() override;

@@ -96,7 +95,7 @@ class Context::Impl : public Context::PrivateIface,
void onAcceptOfLane_(std::shared_ptr<transport::Connection>);
void onReadClientHelloOnLane_(
std::shared_ptr<transport::Connection>,
const proto::Packet&);
const Packet&);

void setError_(Error error);

@@ -157,8 +156,8 @@ Context::Impl::Impl(
TP_THROW_ASSERT_IF(contexts_.size() != listeners_.size());
numLanes_ = contexts_.size();
// FIXME Escape the contexts' domain descriptors in case they contain a colon?
// Or put them all in a protobuf, that'll do the escaping for us.
// But is it okay to compare protobufs by equality bitwise?
// Or put them all in a nop object, that'll do the escaping for us.
// But is it okay to compare nop objects by equality bitwise?
std::ostringstream ss;
ss << contexts_.size();
for (const auto& context : contexts_) {
@@ -196,15 +195,15 @@ const std::string& Context::Impl::domainDescriptor() const {
return domainDescriptor_;
}

std::shared_ptr<channel::Channel> Context::createChannel(
std::shared_ptr<channel::CpuChannel> Context::createChannel(
std::shared_ptr<transport::Connection> connection,
Channel::Endpoint endpoint) {
Endpoint endpoint) {
return impl_->createChannel(std::move(connection), endpoint);
}

std::shared_ptr<channel::Channel> Context::Impl::createChannel(
std::shared_ptr<channel::CpuChannel> Context::Impl::createChannel(
std::shared_ptr<transport::Connection> connection,
Channel::Endpoint endpoint) {
Endpoint endpoint) {
std::string channelId = id_ + ".c" + std::to_string(channelCounter_++);
TP_VLOG(4) << "Channel context " << id_ << " is opening channel "
<< channelId;
@@ -308,42 +307,40 @@ void Context::Impl::onAcceptOfLane_(

// Keep it alive until we figure out what to do with it.
connectionsWaitingForHello_.insert(connection);
auto pbPacketIn = std::make_shared<proto::Packet>();
TP_VLOG(6) << "Channel context " << id_ << " reading proto (client hello)";
auto npHolderIn = std::make_shared<NopHolder<Packet>>();
TP_VLOG(6) << "Channel context " << id_
<< " reading nop object (client hello)";
connection->read(
*pbPacketIn,
lazyCallbackWrapper_([pbPacketIn,
*npHolderIn,
lazyCallbackWrapper_([npHolderIn,
weakConnection{std::weak_ptr<transport::Connection>(
connection)}](Impl& impl) mutable {
TP_VLOG(6) << "Channel context " << impl.id_
<< " done reading proto (client hello)";
<< " done reading nop object (client hello)";
std::shared_ptr<transport::Connection> connection =
weakConnection.lock();
TP_DCHECK(connection);
impl.connectionsWaitingForHello_.erase(connection);
impl.onReadClientHelloOnLane_(std::move(connection), *pbPacketIn);
impl.onReadClientHelloOnLane_(
std::move(connection), npHolderIn->getObject());
}));
}

void Context::Impl::onReadClientHelloOnLane_(
std::shared_ptr<transport::Connection> connection,
const proto::Packet& pbPacketIn) {
const Packet& nopPacketIn) {
TP_DCHECK(loop_.inLoop());

if (pbPacketIn.has_client_hello()) {
const proto::ClientHello& pbClientHello = pbPacketIn.client_hello();
uint64_t registrationId = pbClientHello.registration_id();
auto iter = connectionRequestRegistrations_.find(registrationId);
// The connection request may have already been deregistered, for example
// because the channel may have been closed.
if (iter != connectionRequestRegistrations_.end()) {
auto fn = std::move(iter->second);
connectionRequestRegistrations_.erase(iter);
fn(Error::kSuccess, std::move(connection));
}
} else {
TP_LOG_ERROR() << "packet contained unknown content: "
<< pbPacketIn.type_case();
TP_DCHECK_EQ(nopPacketIn.index(), nopPacketIn.index_of<ClientHello>());

const ClientHello& nopClientHello = *nopPacketIn.get<ClientHello>();
uint64_t registrationId = nopClientHello.registrationId;
auto iter = connectionRequestRegistrations_.find(registrationId);
// The connection request may have already been deregistered, for example
// because the channel may have been closed.
if (iter != connectionRequestRegistrations_.end()) {
auto fn = std::move(iter->second);
connectionRequestRegistrations_.erase(iter);
fn(Error::kSuccess, std::move(connection));
}
}

8 changes: 4 additions & 4 deletions tensorpipe/channel/mpt/context.h
Original file line number Diff line number Diff line change
@@ -12,25 +12,25 @@
#include <string>
#include <vector>

#include <tensorpipe/channel/context.h>
#include <tensorpipe/channel/cpu_context.h>
#include <tensorpipe/common/callback.h>
#include <tensorpipe/transport/context.h>

namespace tensorpipe {
namespace channel {
namespace mpt {

class Context : public channel::Context {
class Context : public channel::CpuContext {
public:
Context(
std::vector<std::shared_ptr<transport::Context>>,
std::vector<std::shared_ptr<transport::Listener>>);

const std::string& domainDescriptor() const override;

std::shared_ptr<Channel> createChannel(
std::shared_ptr<CpuChannel> createChannel(
std::shared_ptr<transport::Connection>,
Channel::Endpoint) override;
Endpoint) override;

void setId(std::string id) override;

47 changes: 47 additions & 0 deletions tensorpipe/channel/mpt/nop_types.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <string>
#include <vector>

#include <nop/serializer.h>
#include <nop/structure.h>
#include <nop/types/variant.h>

namespace tensorpipe {
namespace channel {
namespace mpt {

struct LaneAdvertisement {
// This pointless constructor is needed to work around a bug in GCC 5.5 (and
// possibly other versions). It appears to be needed in the nop types that are
// used inside std::vectors.
LaneAdvertisement(){};

std::string address;
uint64_t registrationId;
NOP_STRUCTURE(LaneAdvertisement, address, registrationId);
};

struct ServerHello {
std::vector<LaneAdvertisement> laneAdvertisements;
NOP_STRUCTURE(ServerHello, laneAdvertisements);
};

struct ClientHello {
uint64_t registrationId;
NOP_STRUCTURE(ClientHello, registrationId);
};

using Packet = nop::Variant<ServerHello, ClientHello>;

} // namespace mpt
} // namespace channel
} // namespace tensorpipe
2 changes: 1 addition & 1 deletion tensorpipe/channel/registry.cc
Original file line number Diff line number Diff line change
@@ -10,4 +10,4 @@

TP_DEFINE_SHARED_REGISTRY(
TensorpipeChannelRegistry,
tensorpipe::channel::Context);
tensorpipe::channel::CpuContext);
4 changes: 2 additions & 2 deletions tensorpipe/channel/registry.h
Original file line number Diff line number Diff line change
@@ -8,9 +8,9 @@

#pragma once

#include <tensorpipe/channel/context.h>
#include <tensorpipe/channel/cpu_context.h>
#include <tensorpipe/util/registry/registry.h>

TP_DECLARE_SHARED_REGISTRY(
TensorpipeChannelRegistry,
tensorpipe::channel::Context);
tensorpipe::channel::CpuContext);
79 changes: 38 additions & 41 deletions tensorpipe/channel/xth/channel.cc
Original file line number Diff line number Diff line change
@@ -8,18 +8,29 @@

#include <tensorpipe/channel/xth/channel.h>

#include <nop/serializer.h>
#include <nop/structure.h>

#include <tensorpipe/channel/error.h>
#include <tensorpipe/channel/helpers.h>
#include <tensorpipe/common/callback.h>
#include <tensorpipe/common/defs.h>
#include <tensorpipe/common/error.h>
#include <tensorpipe/common/error_macros.h>
#include <tensorpipe/proto/channel/xth.pb.h>

namespace tensorpipe {
namespace channel {
namespace xth {

namespace {

struct Descriptor {
uint64_t ptr;
NOP_STRUCTURE(Descriptor, ptr);
};

} // namespace

class Channel::Impl : public std::enable_shared_from_this<Channel::Impl> {
public:
Impl(
@@ -31,16 +42,11 @@ class Channel::Impl : public std::enable_shared_from_this<Channel::Impl> {
void init();

void send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback);

void recv(
TDescriptor descriptor,
void* ptr,
size_t length,
TRecvCallback callback);
void recv(TDescriptor descriptor, CpuBuffer buffer, TRecvCallback callback);

// Tell the channel what its identifier is.
void setId(std::string id);
@@ -54,16 +60,14 @@ class Channel::Impl : public std::enable_shared_from_this<Channel::Impl> {

// Send memory region to peer.
void sendFromLoop_(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback);

// Receive memory region from peer.
void recvFromLoop_(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback);

void setIdFromLoop_(std::string id);
@@ -134,31 +138,26 @@ void Channel::Impl::initFromLoop_() {
}

void Channel::send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) {
impl_->send(ptr, length, std::move(descriptorCallback), std::move(callback));
impl_->send(buffer, std::move(descriptorCallback), std::move(callback));
}

void Channel::Impl::send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) {
loop_.deferToLoop([this,
ptr,
length,
buffer,
descriptorCallback{std::move(descriptorCallback)},
callback{std::move(callback)}]() mutable {
sendFromLoop_(
ptr, length, std::move(descriptorCallback), std::move(callback));
sendFromLoop_(buffer, std::move(descriptorCallback), std::move(callback));
});
}

void Channel::Impl::sendFromLoop_(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) {
TP_DCHECK(loop_.inLoop());
@@ -208,39 +207,36 @@ void Channel::Impl::sendFromLoop_(
callback(impl.error_);
}));

proto::Descriptor pbDescriptor;
pbDescriptor.set_ptr(reinterpret_cast<std::uintptr_t>(ptr));
NopHolder<Descriptor> nopHolder;
Descriptor& nopDescriptor = nopHolder.getObject();
nopDescriptor.ptr = reinterpret_cast<std::uintptr_t>(buffer.ptr);

descriptorCallback(Error::kSuccess, saveDescriptor(pbDescriptor));
descriptorCallback(Error::kSuccess, saveDescriptor(nopHolder));
}

// Receive memory region from peer.
void Channel::recv(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback) {
impl_->recv(std::move(descriptor), ptr, length, std::move(callback));
impl_->recv(std::move(descriptor), buffer, std::move(callback));
}

void Channel::Impl::recv(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback) {
loop_.deferToLoop([this,
descriptor{std::move(descriptor)},
ptr,
length,
buffer,
callback{std::move(callback)}]() mutable {
recvFromLoop_(std::move(descriptor), ptr, length, std::move(callback));
recvFromLoop_(std::move(descriptor), buffer, std::move(callback));
});
}

void Channel::Impl::recvFromLoop_(
TDescriptor descriptor,
void* ptr,
size_t length,
CpuBuffer buffer,
TRecvCallback callback) {
TP_DCHECK(loop_.inLoop());

@@ -262,15 +258,16 @@ void Channel::Impl::recvFromLoop_(
return;
}

proto::Descriptor pbDescriptor;
loadDescriptor(pbDescriptor, descriptor);
void* remotePtr = reinterpret_cast<void*>(pbDescriptor.ptr());
NopHolder<Descriptor> nopHolder;
loadDescriptor(nopHolder, descriptor);
Descriptor& nopDescriptor = nopHolder.getObject();
void* remotePtr = reinterpret_cast<void*>(nopDescriptor.ptr);
TP_VLOG(6) << "Channel " << id_ << " is copying payload (#" << sequenceNumber
<< ")";
context_->requestCopy(
remotePtr,
ptr,
length,
buffer.ptr,
buffer.length,
eagerCallbackWrapper_([sequenceNumber,
callback{std::move(callback)}](Impl& impl) {
TP_VLOG(6) << "Channel " << impl.id_ << " done copying payload (#"
14 changes: 5 additions & 9 deletions tensorpipe/channel/xth/channel.h
Original file line number Diff line number Diff line change
@@ -10,14 +10,14 @@

#include <memory>

#include <tensorpipe/channel/channel.h>
#include <tensorpipe/channel/cpu_context.h>
#include <tensorpipe/channel/xth/context.h>

namespace tensorpipe {
namespace channel {
namespace xth {

class Channel : public channel::Channel {
class Channel : public channel::CpuChannel {
// Use the passkey idiom to allow make_shared to call what should be a private
// constructor. See https://abseil.io/tips/134 for more information.
struct ConstructorToken {};
@@ -31,17 +31,13 @@ class Channel : public channel::Channel {

// Send memory region to peer.
void send(
const void* ptr,
size_t length,
CpuBuffer buffer,
TDescriptorCallback descriptorCallback,
TSendCallback callback) override;

// Receive memory region from peer.
void recv(
TDescriptor descriptor,
void* ptr,
size_t length,
TRecvCallback callback) override;
void recv(TDescriptor descriptor, CpuBuffer buffer, TRecvCallback callback)
override;

// Tell the channel what its identifier is.
void setId(std::string id) override;
12 changes: 6 additions & 6 deletions tensorpipe/channel/xth/context.cc
Original file line number Diff line number Diff line change
@@ -58,9 +58,9 @@ class Context::Impl : public Context::PrivateIface,

const std::string& domainDescriptor() const;

std::shared_ptr<channel::Channel> createChannel(
std::shared_ptr<channel::CpuChannel> createChannel(
std::shared_ptr<transport::Connection>,
Channel::Endpoint);
Endpoint);

void setId(std::string id);

@@ -176,15 +176,15 @@ const std::string& Context::Impl::domainDescriptor() const {
return domainDescriptor_;
}

std::shared_ptr<channel::Channel> Context::createChannel(
std::shared_ptr<channel::CpuChannel> Context::createChannel(
std::shared_ptr<transport::Connection> connection,
Channel::Endpoint endpoint) {
Endpoint endpoint) {
return impl_->createChannel(std::move(connection), endpoint);
}

std::shared_ptr<channel::Channel> Context::Impl::createChannel(
std::shared_ptr<channel::CpuChannel> Context::Impl::createChannel(
std::shared_ptr<transport::Connection> connection,
Channel::Endpoint /* unused */) {
Endpoint /* unused */) {
TP_THROW_ASSERT_IF(joined_);
std::string channelId = id_ + ".c" + std::to_string(channelCounter_++);
TP_VLOG(4) << "Channel context " << id_ << " is opening channel "
8 changes: 4 additions & 4 deletions tensorpipe/channel/xth/context.h
Original file line number Diff line number Diff line change
@@ -12,23 +12,23 @@
#include <memory>
#include <string>

#include <tensorpipe/channel/context.h>
#include <tensorpipe/channel/cpu_context.h>
#include <tensorpipe/common/callback.h>
#include <tensorpipe/common/error.h>

namespace tensorpipe {
namespace channel {
namespace xth {

class Context : public channel::Context {
class Context : public channel::CpuContext {
public:
Context();

const std::string& domainDescriptor() const override;

std::shared_ptr<Channel> createChannel(
std::shared_ptr<CpuChannel> createChannel(
std::shared_ptr<transport::Connection>,
Channel::Endpoint) override;
Endpoint) override;

void setId(std::string id) override;

Original file line number Diff line number Diff line change
@@ -6,10 +6,15 @@
* LICENSE file in the root directory of this source tree.
*/

syntax = "proto3";
#pragma once

package tensorpipe.channel.xth.proto;
#include <cstddef>

message Descriptor {
uint64 ptr = 1;
}
namespace tensorpipe {

struct CpuBuffer {
void* ptr{nullptr};
size_t length{0};
};

} // namespace tensorpipe
23 changes: 23 additions & 0 deletions tensorpipe/common/cuda_buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <cstddef>

#include <cuda_runtime.h>

namespace tensorpipe {

struct CudaBuffer {
void* ptr{nullptr};
size_t length{0};
cudaStream_t stream{cudaStreamDefault};
};

} // namespace tensorpipe
4 changes: 2 additions & 2 deletions tensorpipe/common/defs.h
Original file line number Diff line number Diff line change
@@ -22,8 +22,8 @@
#include <system_error>

// Branch hint macros. C++20 will include them as part of language.
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
#define likely(x) __builtin_expect((x) ? 1 : 0, 1)
#define unlikely(x) __builtin_expect((x) ? 1 : 0, 0)

/// Auxiliar class to build exception, fill up it's what message and throw
/// in a single line. Usually uses as r-value so that destructor is called
21 changes: 21 additions & 0 deletions tensorpipe/common/error.cc
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@

#include <tensorpipe/common/error.h>

#include <cstring>
#include <sstream>

#include <tensorpipe/common/defs.h>
@@ -21,4 +22,24 @@ std::string Error::what() const {
return error_->what();
}

std::string SystemError::what() const {
std::ostringstream ss;
ss << syscall_ << ": " << strerror(error_);
return ss.str();
}

std::string ShortReadError::what() const {
std::ostringstream ss;
ss << "short read: got " << actual_ << " bytes while expecting to read "
<< expected_ << " bytes";
return ss.str();
}

std::string ShortWriteError::what() const {
std::ostringstream ss;
ss << "short write: wrote " << actual_ << " bytes while expecting to write "
<< expected_ << " bytes";
return ss.str();
}

} // namespace tensorpipe
36 changes: 36 additions & 0 deletions tensorpipe/common/error.h
Original file line number Diff line number Diff line change
@@ -71,4 +71,40 @@ class Error final {
std::shared_ptr<BaseError> error_;
};

class SystemError final : public BaseError {
public:
explicit SystemError(const char* syscall, int error)
: syscall_(syscall), error_(error) {}

std::string what() const override;

private:
const char* syscall_;
const int error_;
};

class ShortReadError final : public BaseError {
public:
ShortReadError(ssize_t expected, ssize_t actual)
: expected_(expected), actual_(actual) {}

std::string what() const override;

private:
const ssize_t expected_;
const ssize_t actual_;
};

class ShortWriteError final : public BaseError {
public:
ShortWriteError(ssize_t expected, ssize_t actual)
: expected_(expected), actual_(actual) {}

std::string what() const override;

private:
const ssize_t expected_;
const ssize_t actual_;
};

} // namespace tensorpipe
8 changes: 2 additions & 6 deletions tensorpipe/transport/shm/fd.cc → tensorpipe/common/fd.cc
Original file line number Diff line number Diff line change
@@ -6,17 +6,15 @@
* LICENSE file in the root directory of this source tree.
*/

#include <tensorpipe/transport/shm/fd.h>
#include <tensorpipe/common/fd.h>

#include <unistd.h>

#include <tensorpipe/common/defs.h>
#include <tensorpipe/common/error.h>
#include <tensorpipe/common/error_macros.h>
#include <tensorpipe/transport/error.h>

namespace tensorpipe {
namespace transport {
namespace shm {

Fd::~Fd() {
if (fd_ >= 0) {
@@ -73,6 +71,4 @@ Error Fd::writeFull(const void* buf, size_t count) {
return Error::kSuccess;
}

} // namespace shm
} // namespace transport
} // namespace tensorpipe
18 changes: 4 additions & 14 deletions tensorpipe/transport/shm/fd.h → tensorpipe/common/fd.h
Original file line number Diff line number Diff line change
@@ -8,19 +8,18 @@

#pragma once

#include <stdexcept>
#include <type_traits>

#include <unistd.h>

#include <tensorpipe/common/error.h>

namespace tensorpipe {
namespace transport {
namespace shm {

class Fd {
public:
/* implicit */ Fd() {}
Fd() = default;

explicit Fd(int fd) : fd_(fd) {}

@@ -33,12 +32,12 @@ class Fd {
Fd& operator=(const Fd&) = delete;

// Custom move constructor.
Fd(Fd&& other) {
Fd(Fd&& other) noexcept {
std::swap(fd_, other.fd_);
}

// Custom move assignment.
Fd& operator=(Fd&& other) {
Fd& operator=(Fd&& other) noexcept {
std::swap(fd_, other.fd_);
return *this;
}
@@ -48,13 +47,6 @@ class Fd {
return fd_;
}

// Release underlying file descriptor.
int release() {
auto fd = fd_;
fd_ = -1;
return fd;
}

// Proxy to read(2) with EINTR retry.
ssize_t read(void* buf, size_t count);

@@ -107,6 +99,4 @@ class Fd {
int fd_{-1};
};

} // namespace shm
} // namespace transport
} // namespace tensorpipe
59 changes: 59 additions & 0 deletions tensorpipe/common/memory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <sys/mman.h>

#include <memory>

#include <tensorpipe/common/defs.h>

namespace tensorpipe {

class MmappedPtr {
public:
MmappedPtr() = default;

MmappedPtr(size_t length, int prot, int flags, int fd) {
void* ptr;
ptr = ::mmap(nullptr, length, prot, flags, fd, 0);
TP_THROW_SYSTEM_IF(ptr == MAP_FAILED, errno);
ptr_ = decltype(ptr_)(reinterpret_cast<uint8_t*>(ptr), Deleter{length});
}

uint8_t* ptr() {
return ptr_.get();
}

const uint8_t* ptr() const {
return ptr_.get();
}

size_t getLength() const {
return ptr_.get_deleter().length;
}

void reset() {
ptr_.reset();
}

private:
struct Deleter {
size_t length;

void operator()(void* ptr) {
int ret = ::munmap(ptr, length);
TP_THROW_SYSTEM_IF(ret != 0, errno);
}
};

std::unique_ptr<uint8_t[], Deleter> ptr_{nullptr, Deleter{}};
};

} // namespace tensorpipe
Loading