How pytorch's parallel method and distributed method works?











up vote
9
down vote

favorite
1












I'm no expert in distributed system and CUDA. But there is one really interesting feature that PyTorch support which is nn.DataParallel and nn.DistributedDataParallel. How they are actually implemented? How they separate common embeddings and synchronize data?



Here is a basic example of DataParallel.



import torch.nn as nn
from torch.autograd.variable import Variable
import numpy as np

class Model(nn.Module):
def __init__(self):
super().__init__(
embedding=nn.Embedding(1000, 10),
rnn=nn.Linear(10, 10),
)

def forward(self, x):
x = self.embedding(x)
x = self.rnn(x)
return x

model = nn.DataParallel(Model())
model.forward(Variable.from_numpy(np.array([1,2,3,4,5,6], dtype=np.int64)).cuda()).cpu()


PyTorch can split the input and send them to many GPUs and merge the results back.



How does it manage embeddings and synchronization for a parallel model or a distributed model?

I wandered around PyTorch's code but it's very hard to know how the fundamentals work.










share|improve this question




















  • 1




    It might actually be better to ask on pytorch forums.
    – Umang Gupta
    Nov 22 at 20:06










  • My question on the forum: discuss.pytorch.org/t/…
    – fantasticfears
    Nov 24 at 19:08















up vote
9
down vote

favorite
1












I'm no expert in distributed system and CUDA. But there is one really interesting feature that PyTorch support which is nn.DataParallel and nn.DistributedDataParallel. How they are actually implemented? How they separate common embeddings and synchronize data?



Here is a basic example of DataParallel.



import torch.nn as nn
from torch.autograd.variable import Variable
import numpy as np

class Model(nn.Module):
def __init__(self):
super().__init__(
embedding=nn.Embedding(1000, 10),
rnn=nn.Linear(10, 10),
)

def forward(self, x):
x = self.embedding(x)
x = self.rnn(x)
return x

model = nn.DataParallel(Model())
model.forward(Variable.from_numpy(np.array([1,2,3,4,5,6], dtype=np.int64)).cuda()).cpu()


PyTorch can split the input and send them to many GPUs and merge the results back.



How does it manage embeddings and synchronization for a parallel model or a distributed model?

I wandered around PyTorch's code but it's very hard to know how the fundamentals work.










share|improve this question




















  • 1




    It might actually be better to ask on pytorch forums.
    – Umang Gupta
    Nov 22 at 20:06










  • My question on the forum: discuss.pytorch.org/t/…
    – fantasticfears
    Nov 24 at 19:08













up vote
9
down vote

favorite
1









up vote
9
down vote

favorite
1






1





I'm no expert in distributed system and CUDA. But there is one really interesting feature that PyTorch support which is nn.DataParallel and nn.DistributedDataParallel. How they are actually implemented? How they separate common embeddings and synchronize data?



Here is a basic example of DataParallel.



import torch.nn as nn
from torch.autograd.variable import Variable
import numpy as np

class Model(nn.Module):
def __init__(self):
super().__init__(
embedding=nn.Embedding(1000, 10),
rnn=nn.Linear(10, 10),
)

def forward(self, x):
x = self.embedding(x)
x = self.rnn(x)
return x

model = nn.DataParallel(Model())
model.forward(Variable.from_numpy(np.array([1,2,3,4,5,6], dtype=np.int64)).cuda()).cpu()


PyTorch can split the input and send them to many GPUs and merge the results back.



How does it manage embeddings and synchronization for a parallel model or a distributed model?

I wandered around PyTorch's code but it's very hard to know how the fundamentals work.










share|improve this question















I'm no expert in distributed system and CUDA. But there is one really interesting feature that PyTorch support which is nn.DataParallel and nn.DistributedDataParallel. How they are actually implemented? How they separate common embeddings and synchronize data?



Here is a basic example of DataParallel.



import torch.nn as nn
from torch.autograd.variable import Variable
import numpy as np

class Model(nn.Module):
def __init__(self):
super().__init__(
embedding=nn.Embedding(1000, 10),
rnn=nn.Linear(10, 10),
)

def forward(self, x):
x = self.embedding(x)
x = self.rnn(x)
return x

model = nn.DataParallel(Model())
model.forward(Variable.from_numpy(np.array([1,2,3,4,5,6], dtype=np.int64)).cuda()).cpu()


PyTorch can split the input and send them to many GPUs and merge the results back.



How does it manage embeddings and synchronization for a parallel model or a distributed model?

I wandered around PyTorch's code but it's very hard to know how the fundamentals work.







c++ python-3.x parallel-processing distributed-computing pytorch






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 19 at 13:50









blue-phoenox

3,75681440




3,75681440










asked Nov 19 at 13:13









fantasticfears

34129




34129








  • 1




    It might actually be better to ask on pytorch forums.
    – Umang Gupta
    Nov 22 at 20:06










  • My question on the forum: discuss.pytorch.org/t/…
    – fantasticfears
    Nov 24 at 19:08














  • 1




    It might actually be better to ask on pytorch forums.
    – Umang Gupta
    Nov 22 at 20:06










  • My question on the forum: discuss.pytorch.org/t/…
    – fantasticfears
    Nov 24 at 19:08








1




1




It might actually be better to ask on pytorch forums.
– Umang Gupta
Nov 22 at 20:06




It might actually be better to ask on pytorch forums.
– Umang Gupta
Nov 22 at 20:06












My question on the forum: discuss.pytorch.org/t/…
– fantasticfears
Nov 24 at 19:08




My question on the forum: discuss.pytorch.org/t/…
– fantasticfears
Nov 24 at 19:08












1 Answer
1






active

oldest

votes

















up vote
-1
down vote













From what I can trace, the code is implemented in parallel_apply.py



[Edit: Paste the code here of easy reference]



def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
r"""Applies each `module` in :attr:`modules` in parallel on arguments
contained in :attr:`inputs` (positional) and :attr:`kwargs_tup` (keyword)
on each of :attr:`devices`.
Args:
modules (Module): modules to be parallelized
inputs (tensor): inputs to the modules
devices (list of int or torch.device): CUDA devices
:attr:`modules`, :attr:`inputs`, :attr:`kwargs_tup` (if given), and
:attr:`devices` (if given) should all have same length. Moreover, each
element of :attr:`inputs` can either be a single object as the only argument
to a module, or a collection of positional arguments.
"""
assert len(modules) == len(inputs)
if kwargs_tup is not None:
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = list(map(lambda x: _get_device_index(x, True), devices))
lock = threading.Lock()
results = {}
grad_enabled = torch.is_grad_enabled()

def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs)
with lock:
results[i] = output
except Exception as e:
with lock:
results[i] = e

if len(modules) > 1:
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]

for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])

outputs =
for i in range(len(inputs)):
output = results[i]
if isinstance(output, Exception):
raise output
outputs.append(output)
return outputs




  • modules are modules to be paralleled.


  • inputs are tensors to the modules


  • devices are CUDA devices


  • results and output store the final result


  • _worker() is the main function to be run by thread






share|improve this answer



















  • 1




    this does not answer the question. it's barely a "link only" answer
    – Shai
    Nov 28 at 22:06











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53375422%2fhow-pytorchs-parallel-method-and-distributed-method-works%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes








up vote
-1
down vote













From what I can trace, the code is implemented in parallel_apply.py



[Edit: Paste the code here of easy reference]



def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
r"""Applies each `module` in :attr:`modules` in parallel on arguments
contained in :attr:`inputs` (positional) and :attr:`kwargs_tup` (keyword)
on each of :attr:`devices`.
Args:
modules (Module): modules to be parallelized
inputs (tensor): inputs to the modules
devices (list of int or torch.device): CUDA devices
:attr:`modules`, :attr:`inputs`, :attr:`kwargs_tup` (if given), and
:attr:`devices` (if given) should all have same length. Moreover, each
element of :attr:`inputs` can either be a single object as the only argument
to a module, or a collection of positional arguments.
"""
assert len(modules) == len(inputs)
if kwargs_tup is not None:
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = list(map(lambda x: _get_device_index(x, True), devices))
lock = threading.Lock()
results = {}
grad_enabled = torch.is_grad_enabled()

def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs)
with lock:
results[i] = output
except Exception as e:
with lock:
results[i] = e

if len(modules) > 1:
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]

for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])

outputs =
for i in range(len(inputs)):
output = results[i]
if isinstance(output, Exception):
raise output
outputs.append(output)
return outputs




  • modules are modules to be paralleled.


  • inputs are tensors to the modules


  • devices are CUDA devices


  • results and output store the final result


  • _worker() is the main function to be run by thread






share|improve this answer



















  • 1




    this does not answer the question. it's barely a "link only" answer
    – Shai
    Nov 28 at 22:06















up vote
-1
down vote













From what I can trace, the code is implemented in parallel_apply.py



[Edit: Paste the code here of easy reference]



def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
r"""Applies each `module` in :attr:`modules` in parallel on arguments
contained in :attr:`inputs` (positional) and :attr:`kwargs_tup` (keyword)
on each of :attr:`devices`.
Args:
modules (Module): modules to be parallelized
inputs (tensor): inputs to the modules
devices (list of int or torch.device): CUDA devices
:attr:`modules`, :attr:`inputs`, :attr:`kwargs_tup` (if given), and
:attr:`devices` (if given) should all have same length. Moreover, each
element of :attr:`inputs` can either be a single object as the only argument
to a module, or a collection of positional arguments.
"""
assert len(modules) == len(inputs)
if kwargs_tup is not None:
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = list(map(lambda x: _get_device_index(x, True), devices))
lock = threading.Lock()
results = {}
grad_enabled = torch.is_grad_enabled()

def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs)
with lock:
results[i] = output
except Exception as e:
with lock:
results[i] = e

if len(modules) > 1:
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]

for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])

outputs =
for i in range(len(inputs)):
output = results[i]
if isinstance(output, Exception):
raise output
outputs.append(output)
return outputs




  • modules are modules to be paralleled.


  • inputs are tensors to the modules


  • devices are CUDA devices


  • results and output store the final result


  • _worker() is the main function to be run by thread






share|improve this answer



















  • 1




    this does not answer the question. it's barely a "link only" answer
    – Shai
    Nov 28 at 22:06













up vote
-1
down vote










up vote
-1
down vote









From what I can trace, the code is implemented in parallel_apply.py



[Edit: Paste the code here of easy reference]



def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
r"""Applies each `module` in :attr:`modules` in parallel on arguments
contained in :attr:`inputs` (positional) and :attr:`kwargs_tup` (keyword)
on each of :attr:`devices`.
Args:
modules (Module): modules to be parallelized
inputs (tensor): inputs to the modules
devices (list of int or torch.device): CUDA devices
:attr:`modules`, :attr:`inputs`, :attr:`kwargs_tup` (if given), and
:attr:`devices` (if given) should all have same length. Moreover, each
element of :attr:`inputs` can either be a single object as the only argument
to a module, or a collection of positional arguments.
"""
assert len(modules) == len(inputs)
if kwargs_tup is not None:
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = list(map(lambda x: _get_device_index(x, True), devices))
lock = threading.Lock()
results = {}
grad_enabled = torch.is_grad_enabled()

def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs)
with lock:
results[i] = output
except Exception as e:
with lock:
results[i] = e

if len(modules) > 1:
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]

for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])

outputs =
for i in range(len(inputs)):
output = results[i]
if isinstance(output, Exception):
raise output
outputs.append(output)
return outputs




  • modules are modules to be paralleled.


  • inputs are tensors to the modules


  • devices are CUDA devices


  • results and output store the final result


  • _worker() is the main function to be run by thread






share|improve this answer














From what I can trace, the code is implemented in parallel_apply.py



[Edit: Paste the code here of easy reference]



def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
r"""Applies each `module` in :attr:`modules` in parallel on arguments
contained in :attr:`inputs` (positional) and :attr:`kwargs_tup` (keyword)
on each of :attr:`devices`.
Args:
modules (Module): modules to be parallelized
inputs (tensor): inputs to the modules
devices (list of int or torch.device): CUDA devices
:attr:`modules`, :attr:`inputs`, :attr:`kwargs_tup` (if given), and
:attr:`devices` (if given) should all have same length. Moreover, each
element of :attr:`inputs` can either be a single object as the only argument
to a module, or a collection of positional arguments.
"""
assert len(modules) == len(inputs)
if kwargs_tup is not None:
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = list(map(lambda x: _get_device_index(x, True), devices))
lock = threading.Lock()
results = {}
grad_enabled = torch.is_grad_enabled()

def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs)
with lock:
results[i] = output
except Exception as e:
with lock:
results[i] = e

if len(modules) > 1:
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]

for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])

outputs =
for i in range(len(inputs)):
output = results[i]
if isinstance(output, Exception):
raise output
outputs.append(output)
return outputs




  • modules are modules to be paralleled.


  • inputs are tensors to the modules


  • devices are CUDA devices


  • results and output store the final result


  • _worker() is the main function to be run by thread







share|improve this answer














share|improve this answer



share|improve this answer








edited Nov 29 at 1:41

























answered Nov 28 at 16:17









yoonghm

1,042917




1,042917








  • 1




    this does not answer the question. it's barely a "link only" answer
    – Shai
    Nov 28 at 22:06














  • 1




    this does not answer the question. it's barely a "link only" answer
    – Shai
    Nov 28 at 22:06








1




1




this does not answer the question. it's barely a "link only" answer
– Shai
Nov 28 at 22:06




this does not answer the question. it's barely a "link only" answer
– Shai
Nov 28 at 22:06


















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.





Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


Please pay close attention to the following guidance:


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53375422%2fhow-pytorchs-parallel-method-and-distributed-method-works%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

A CLEAN and SIMPLE way to add appendices to Table of Contents and bookmarks

Calculate evaluation metrics using cross_val_predict sklearn

Insert data from modal to MySQL (multiple modal on website)