|
| 1 | +Combining Distributed DataParallel with Distributed RPC Framework |
| 2 | +================================================================= |
| 3 | +**Author**: `Pritam Damania <https://github.com/pritamdamania87>`_ |
| 4 | + |
| 5 | + |
| 6 | +This tutorial uses a simple example to demonstrate how you can combine |
| 7 | +`DistributedDataParallel <https://pytorch.org/docs/stable/nn.html#torch.nn.parallel.DistributedDataParallel>`__ (DDP) |
| 8 | +with the `Distributed RPC framework <https://pytorch.org/docs/master/rpc.html>`__ |
| 9 | +to combine distributed data parallelism with distributed model parallelism to |
| 10 | +train a simple model. Source code of the example can be found `here <https://github.com/pytorch/examples/tree/master/distributed/rpc/ddp_rpc>`__. |
| 11 | + |
| 12 | +Previous tutorials, |
| 13 | +`Getting Started With Distributed Data Parallel <https://pytorch.org/tutorials/intermediate/ddp_tutorial.html>`__ |
| 14 | +and `Getting Started with Distributed RPC Framework <https://pytorch.org/tutorials/intermediate/rpc_tutorial.html>`__, |
| 15 | +described how to perform distributed data parallel and distributed model |
| 16 | +parallel training respectively. Although, there are several training paradigms |
| 17 | +where you might want to combine these two techniques. For example: |
| 18 | + |
| 19 | +1) If we have a model with a sparse part (large embedding table) and a dense |
| 20 | + part (FC layers), we might want to put the embedding table on a parameter |
| 21 | + server and replicate the FC layer across multiple trainers using `DistributedDataParallel <https://pytorch.org/docs/stable/nn.html#torch.nn.parallel.DistributedDataParallel>`__. |
| 22 | + The `Distributed RPC framework <https://pytorch.org/docs/master/rpc.html>`__ |
| 23 | + can be used to perform embedding lookups on the parameter server. |
| 24 | +2) Enable hybrid parallelism as described in the `PipeDream <https://arxiv.org/abs/1806.03377>`__ paper. |
| 25 | + We can use the `Distributed RPC framework <https://pytorch.org/docs/master/rpc.html>`__ |
| 26 | + to pipeline stages of the model across multiple workers and replicate each |
| 27 | + stage (if needed) using `DistributedDataParallel <https://pytorch.org/docs/stable/nn.html#torch.nn.parallel.DistributedDataParallel>`__. |
| 28 | + |
| 29 | +| |
| 30 | +In this tutorial we will cover case 1 mentioned above. We have a total of 4 |
| 31 | +workers in our setup as follows: |
| 32 | + |
| 33 | + |
| 34 | +1) 1 Master, which is responsible for creating an embedding table |
| 35 | + (nn.EmbeddingBag) on the parameter server. The master also drives the |
| 36 | + training loop on the two trainers. |
| 37 | +2) 1 Parameter Server, which basically holds the embedding table in memory and |
| 38 | + responds to RPCs from the Master and Trainers. |
| 39 | +3) 2 Trainers, which store an FC layer (nn.Linear) which is replicated amongst |
| 40 | + themselves using `DistributedDataParallel <https://pytorch.org/docs/stable/nn.html#torch.nn.parallel.DistributedDataParallel>`__. |
| 41 | + The trainers are also responsible for executing the forward pass, backward |
| 42 | + pass and optimizer step. |
| 43 | + |
| 44 | +| |
| 45 | +The entire training process is executed as follows: |
| 46 | + |
| 47 | +1) The master creates an embedding table on the Parameter Server and holds an |
| 48 | + `RRef <https://pytorch.org/docs/master/rpc.html#rref>`__ to it. |
| 49 | +2) The master, then kicks off the training loop on the trainers and passes the |
| 50 | + embedding table RRef to the trainers. |
| 51 | +3) The trainers create a ``HybridModel`` which first performs an embedding lookup |
| 52 | + using the embedding table RRef provided by the master and then executes the |
| 53 | + FC layer which is wrapped inside DDP. |
| 54 | +4) The trainer executes the forward pass of the model and uses the loss to |
| 55 | + execute the backward pass using `Distributed Autograd <https://pytorch.org/docs/master/rpc.html#distributed-autograd-framework>`__. |
| 56 | +5) As part of the backward pass, the gradients for the FC layer are computed |
| 57 | + first and synced to all trainers via allreduce in DDP. |
| 58 | +6) Next, Distributed Autograd propagates the gradients to the parameter server, |
| 59 | + where the gradients for the embedding table are updated. |
| 60 | +7) Finally, the `Distributed Optimizer <https://pytorch.org/docs/master/rpc.html#module-torch.distributed.optim>`__ is used to update all the parameters. |
| 61 | + |
| 62 | + |
| 63 | +.. attention:: |
| 64 | + |
| 65 | + You should always use `Distributed Autograd <https://pytorch.org/docs/master/rpc.html#distributed-autograd-framework>`__ |
| 66 | + for the backward pass if you're combining DDP and RPC. |
| 67 | + |
| 68 | + |
| 69 | +Now, let's go through each part in detail. Firstly, we need to setup all of our |
| 70 | +workers before we can perform any training. We create 4 processes such that |
| 71 | +ranks 0 and 1 are our trainers, rank 2 is the master and rank 3 is the |
| 72 | +parameter server. |
| 73 | + |
| 74 | +We initialize the RPC framework on all 4 workers using the TCP init_method. |
| 75 | +Once RPC initialization is done, the master creates an `EmbeddingBag <https://pytorch.org/docs/master/generated/torch.nn.EmbeddingBag.html>`__ |
| 76 | +on the Parameter Server using `rpc.remote <https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.remote>`__. |
| 77 | +The master then loops through each trainer and kicks of the training loop by |
| 78 | +calling ``_run_trainer`` on each trainer using `rpc_async <https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.rpc_async>`__. |
| 79 | +Finally, the master waits for all training to finish before exiting. |
| 80 | + |
| 81 | +The trainers first initialize a ``ProcessGroup`` for DDP with world_size=2 |
| 82 | +(for two trainers) using `init_process_group <https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group>`__. |
| 83 | +Next, they initialize the RPC framework using the TCP init_method. Note that |
| 84 | +the ports are different in RPC initialization and ProcessGroup initialization. |
| 85 | +This is to avoid port conflicts between initialization of both frameworks. |
| 86 | +Once the initialization is done, the trainers just wait for the ``_run_trainer`` |
| 87 | +RPC from the master. |
| 88 | + |
| 89 | +The parameter server just initializes the RPC framework and waits for RPCs from |
| 90 | +the trainers and master. |
| 91 | + |
| 92 | + |
| 93 | +.. literalinclude:: ../advanced_source/rpc_ddp_tutorial/main.py |
| 94 | + :language: py |
| 95 | + :start-after: BEGIN run_worker |
| 96 | + :end-before: END run_worker |
| 97 | + |
| 98 | +Before we discuss details of the Trainer, let's introduce the ``HybridModel`` that |
| 99 | +the trainer uses. As described below, the ``HybridModel`` is initialized using an |
| 100 | +RRef to the embedding table (emb_rref) on the parameter server and the ``device`` |
| 101 | +to use for DDP. The initialization of the model wraps an |
| 102 | +`nn.Linear <https://pytorch.org/docs/master/generated/torch.nn.Linear.html>`__ |
| 103 | +layer inside DDP to replicate and synchronize this layer across all trainers. |
| 104 | + |
| 105 | +The forward method of the model is pretty straightforward. It performs an |
| 106 | +embedding lookup on the parameter server using an |
| 107 | +`RRef helper <https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.RRef.rpc_sync>`__ |
| 108 | +and passes its output onto the FC layer. |
| 109 | + |
| 110 | + |
| 111 | +.. literalinclude:: ../advanced_source/rpc_ddp_tutorial/main.py |
| 112 | + :language: py |
| 113 | + :start-after: BEGIN hybrid_model |
| 114 | + :end-before: END hybrid_model |
| 115 | + |
| 116 | +Next, let's look at the setup on the Trainer. The trainer first creates the |
| 117 | +``HybridModel`` described above using an RRef to the embedding table on the |
| 118 | +parameter server and its own rank. |
| 119 | + |
| 120 | +Now, we need to retrieve a list of RRefs to all the parameters that we would |
| 121 | +like to optimize with `DistributedOptimizer <https://pytorch.org/docs/master/rpc.html#module-torch.distributed.optim>`__. |
| 122 | +To retrieve the parameters for the embedding table from the parameter server, |
| 123 | +we define a simple helper function ``_retrieve_embedding_parameters``, which |
| 124 | +basically walks through all the parameters for the embedding table and returns |
| 125 | +a list of RRefs. The trainer calls this method on the parameter server via RPC |
| 126 | +to receive a list of RRefs to the desired parameters. Since the |
| 127 | +DistributedOptimizer always takes a list of RRefs to parameters that need to |
| 128 | +be optimized, we need to create RRefs even for the local parameters for our |
| 129 | +FC layers. This is done by walking ``model.parameters()``, creating an RRef for |
| 130 | +each parameter and appending it to a list. Note that ``model.parameters()`` only |
| 131 | +returns local parameters and doesn't include ``emb_rref``. |
| 132 | + |
| 133 | +Finally, we create our DistributedOptimizer using all the RRefs and define a |
| 134 | +CrossEntropyLoss function. |
| 135 | + |
| 136 | +.. literalinclude:: ../advanced_source/rpc_ddp_tutorial/main.py |
| 137 | + :language: py |
| 138 | + :start-after: BEGIN setup_trainer |
| 139 | + :end-before: END setup_trainer |
| 140 | + |
| 141 | +Now we're ready to introduce the main training loop that is run on each trainer. |
| 142 | +``get_next_batch`` is just a helper function to generate random inputs and |
| 143 | +targets for training. We run the training loop for multiple epochs and for each |
| 144 | +batch: |
| 145 | + |
| 146 | +1) Setup a `Distributed Autograd Context <https://pytorch.org/docs/master/rpc.html#torch.distributed.autograd.context>`__ |
| 147 | + for Distributed Autograd. |
| 148 | +2) Run the forward pass of the model and retrieve its output. |
| 149 | +3) Compute the loss based on our outputs and targets using the loss function. |
| 150 | +4) Use Distributed Autograd to execute a distributed backward pass using the loss. |
| 151 | +5) Finally, run a Distributed Optimizer step to optimize all the parameters. |
| 152 | + |
| 153 | +.. literalinclude:: ../advanced_source/rpc_ddp_tutorial/main.py |
| 154 | + :language: py |
| 155 | + :start-after: BEGIN run_trainer |
| 156 | + :end-before: END run_trainer |
| 157 | +.. code:: python |
| 158 | +
|
| 159 | +Source code for the entire example can be found `here <https://github.com/pytorch/examples/tree/master/distributed/rpc/ddp_rpc>`__. |
0 commit comments