How pytorch's parallel method and distributed method works?
up vote
9
down vote
favorite
I'm no expert in distributed system and CUDA. But there is one really interesting feature that PyTorch support which is nn.DataParallel
and nn.DistributedDataParallel
. How they are actually implemented? How they separate common embeddings and synchronize data?
Here is a basic example of DataParallel
.
import torch.nn as nn
from torch.autograd.variable import Variable
import numpy as np
class Model(nn.Module):
def __init__(self):
super().__init__(
embedding=nn.Embedding(1000, 10),
rnn=nn.Linear(10, 10),
)
def forward(self, x):
x = self.embedding(x)
x = self.rnn(x)
return x
model = nn.DataParallel(Model())
model.forward(Variable.from_numpy(np.array([1,2,3,4,5,6], dtype=np.int64)).cuda()).cpu()
PyTorch can split the input and send them to many GPUs and merge the results back.
How does it manage embeddings and synchronization for a parallel model or a distributed model?
I wandered around PyTorch's code but it's very hard to know how the fundamentals work.
c++ python-3.x parallel-processing distributed-computing pytorch
add a comment |
up vote
9
down vote
favorite
I'm no expert in distributed system and CUDA. But there is one really interesting feature that PyTorch support which is nn.DataParallel
and nn.DistributedDataParallel
. How they are actually implemented? How they separate common embeddings and synchronize data?
Here is a basic example of DataParallel
.
import torch.nn as nn
from torch.autograd.variable import Variable
import numpy as np
class Model(nn.Module):
def __init__(self):
super().__init__(
embedding=nn.Embedding(1000, 10),
rnn=nn.Linear(10, 10),
)
def forward(self, x):
x = self.embedding(x)
x = self.rnn(x)
return x
model = nn.DataParallel(Model())
model.forward(Variable.from_numpy(np.array([1,2,3,4,5,6], dtype=np.int64)).cuda()).cpu()
PyTorch can split the input and send them to many GPUs and merge the results back.
How does it manage embeddings and synchronization for a parallel model or a distributed model?
I wandered around PyTorch's code but it's very hard to know how the fundamentals work.
c++ python-3.x parallel-processing distributed-computing pytorch
1
It might actually be better to ask on pytorch forums.
– Umang Gupta
Nov 22 at 20:06
My question on the forum: discuss.pytorch.org/t/…
– fantasticfears
Nov 24 at 19:08
add a comment |
up vote
9
down vote
favorite
up vote
9
down vote
favorite
I'm no expert in distributed system and CUDA. But there is one really interesting feature that PyTorch support which is nn.DataParallel
and nn.DistributedDataParallel
. How they are actually implemented? How they separate common embeddings and synchronize data?
Here is a basic example of DataParallel
.
import torch.nn as nn
from torch.autograd.variable import Variable
import numpy as np
class Model(nn.Module):
def __init__(self):
super().__init__(
embedding=nn.Embedding(1000, 10),
rnn=nn.Linear(10, 10),
)
def forward(self, x):
x = self.embedding(x)
x = self.rnn(x)
return x
model = nn.DataParallel(Model())
model.forward(Variable.from_numpy(np.array([1,2,3,4,5,6], dtype=np.int64)).cuda()).cpu()
PyTorch can split the input and send them to many GPUs and merge the results back.
How does it manage embeddings and synchronization for a parallel model or a distributed model?
I wandered around PyTorch's code but it's very hard to know how the fundamentals work.
c++ python-3.x parallel-processing distributed-computing pytorch
I'm no expert in distributed system and CUDA. But there is one really interesting feature that PyTorch support which is nn.DataParallel
and nn.DistributedDataParallel
. How they are actually implemented? How they separate common embeddings and synchronize data?
Here is a basic example of DataParallel
.
import torch.nn as nn
from torch.autograd.variable import Variable
import numpy as np
class Model(nn.Module):
def __init__(self):
super().__init__(
embedding=nn.Embedding(1000, 10),
rnn=nn.Linear(10, 10),
)
def forward(self, x):
x = self.embedding(x)
x = self.rnn(x)
return x
model = nn.DataParallel(Model())
model.forward(Variable.from_numpy(np.array([1,2,3,4,5,6], dtype=np.int64)).cuda()).cpu()
PyTorch can split the input and send them to many GPUs and merge the results back.
How does it manage embeddings and synchronization for a parallel model or a distributed model?
I wandered around PyTorch's code but it's very hard to know how the fundamentals work.
c++ python-3.x parallel-processing distributed-computing pytorch
c++ python-3.x parallel-processing distributed-computing pytorch
edited Nov 19 at 13:50
blue-phoenox
3,75681440
3,75681440
asked Nov 19 at 13:13
fantasticfears
34129
34129
1
It might actually be better to ask on pytorch forums.
– Umang Gupta
Nov 22 at 20:06
My question on the forum: discuss.pytorch.org/t/…
– fantasticfears
Nov 24 at 19:08
add a comment |
1
It might actually be better to ask on pytorch forums.
– Umang Gupta
Nov 22 at 20:06
My question on the forum: discuss.pytorch.org/t/…
– fantasticfears
Nov 24 at 19:08
1
1
It might actually be better to ask on pytorch forums.
– Umang Gupta
Nov 22 at 20:06
It might actually be better to ask on pytorch forums.
– Umang Gupta
Nov 22 at 20:06
My question on the forum: discuss.pytorch.org/t/…
– fantasticfears
Nov 24 at 19:08
My question on the forum: discuss.pytorch.org/t/…
– fantasticfears
Nov 24 at 19:08
add a comment |
1 Answer
1
active
oldest
votes
up vote
-1
down vote
From what I can trace, the code is implemented in parallel_apply.py
[Edit: Paste the code here of easy reference]
def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
r"""Applies each `module` in :attr:`modules` in parallel on arguments
contained in :attr:`inputs` (positional) and :attr:`kwargs_tup` (keyword)
on each of :attr:`devices`.
Args:
modules (Module): modules to be parallelized
inputs (tensor): inputs to the modules
devices (list of int or torch.device): CUDA devices
:attr:`modules`, :attr:`inputs`, :attr:`kwargs_tup` (if given), and
:attr:`devices` (if given) should all have same length. Moreover, each
element of :attr:`inputs` can either be a single object as the only argument
to a module, or a collection of positional arguments.
"""
assert len(modules) == len(inputs)
if kwargs_tup is not None:
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = list(map(lambda x: _get_device_index(x, True), devices))
lock = threading.Lock()
results = {}
grad_enabled = torch.is_grad_enabled()
def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs)
with lock:
results[i] = output
except Exception as e:
with lock:
results[i] = e
if len(modules) > 1:
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])
outputs =
for i in range(len(inputs)):
output = results[i]
if isinstance(output, Exception):
raise output
outputs.append(output)
return outputs
modules
are modules to be paralleled.
inputs
are tensors to the modules
devices
are CUDA devices
results
andoutput
store the final result
_worker()
is the main function to be run by thread
1
this does not answer the question. it's barely a "link only" answer
– Shai
Nov 28 at 22:06
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53375422%2fhow-pytorchs-parallel-method-and-distributed-method-works%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
-1
down vote
From what I can trace, the code is implemented in parallel_apply.py
[Edit: Paste the code here of easy reference]
def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
r"""Applies each `module` in :attr:`modules` in parallel on arguments
contained in :attr:`inputs` (positional) and :attr:`kwargs_tup` (keyword)
on each of :attr:`devices`.
Args:
modules (Module): modules to be parallelized
inputs (tensor): inputs to the modules
devices (list of int or torch.device): CUDA devices
:attr:`modules`, :attr:`inputs`, :attr:`kwargs_tup` (if given), and
:attr:`devices` (if given) should all have same length. Moreover, each
element of :attr:`inputs` can either be a single object as the only argument
to a module, or a collection of positional arguments.
"""
assert len(modules) == len(inputs)
if kwargs_tup is not None:
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = list(map(lambda x: _get_device_index(x, True), devices))
lock = threading.Lock()
results = {}
grad_enabled = torch.is_grad_enabled()
def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs)
with lock:
results[i] = output
except Exception as e:
with lock:
results[i] = e
if len(modules) > 1:
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])
outputs =
for i in range(len(inputs)):
output = results[i]
if isinstance(output, Exception):
raise output
outputs.append(output)
return outputs
modules
are modules to be paralleled.
inputs
are tensors to the modules
devices
are CUDA devices
results
andoutput
store the final result
_worker()
is the main function to be run by thread
1
this does not answer the question. it's barely a "link only" answer
– Shai
Nov 28 at 22:06
add a comment |
up vote
-1
down vote
From what I can trace, the code is implemented in parallel_apply.py
[Edit: Paste the code here of easy reference]
def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
r"""Applies each `module` in :attr:`modules` in parallel on arguments
contained in :attr:`inputs` (positional) and :attr:`kwargs_tup` (keyword)
on each of :attr:`devices`.
Args:
modules (Module): modules to be parallelized
inputs (tensor): inputs to the modules
devices (list of int or torch.device): CUDA devices
:attr:`modules`, :attr:`inputs`, :attr:`kwargs_tup` (if given), and
:attr:`devices` (if given) should all have same length. Moreover, each
element of :attr:`inputs` can either be a single object as the only argument
to a module, or a collection of positional arguments.
"""
assert len(modules) == len(inputs)
if kwargs_tup is not None:
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = list(map(lambda x: _get_device_index(x, True), devices))
lock = threading.Lock()
results = {}
grad_enabled = torch.is_grad_enabled()
def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs)
with lock:
results[i] = output
except Exception as e:
with lock:
results[i] = e
if len(modules) > 1:
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])
outputs =
for i in range(len(inputs)):
output = results[i]
if isinstance(output, Exception):
raise output
outputs.append(output)
return outputs
modules
are modules to be paralleled.
inputs
are tensors to the modules
devices
are CUDA devices
results
andoutput
store the final result
_worker()
is the main function to be run by thread
1
this does not answer the question. it's barely a "link only" answer
– Shai
Nov 28 at 22:06
add a comment |
up vote
-1
down vote
up vote
-1
down vote
From what I can trace, the code is implemented in parallel_apply.py
[Edit: Paste the code here of easy reference]
def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
r"""Applies each `module` in :attr:`modules` in parallel on arguments
contained in :attr:`inputs` (positional) and :attr:`kwargs_tup` (keyword)
on each of :attr:`devices`.
Args:
modules (Module): modules to be parallelized
inputs (tensor): inputs to the modules
devices (list of int or torch.device): CUDA devices
:attr:`modules`, :attr:`inputs`, :attr:`kwargs_tup` (if given), and
:attr:`devices` (if given) should all have same length. Moreover, each
element of :attr:`inputs` can either be a single object as the only argument
to a module, or a collection of positional arguments.
"""
assert len(modules) == len(inputs)
if kwargs_tup is not None:
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = list(map(lambda x: _get_device_index(x, True), devices))
lock = threading.Lock()
results = {}
grad_enabled = torch.is_grad_enabled()
def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs)
with lock:
results[i] = output
except Exception as e:
with lock:
results[i] = e
if len(modules) > 1:
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])
outputs =
for i in range(len(inputs)):
output = results[i]
if isinstance(output, Exception):
raise output
outputs.append(output)
return outputs
modules
are modules to be paralleled.
inputs
are tensors to the modules
devices
are CUDA devices
results
andoutput
store the final result
_worker()
is the main function to be run by thread
From what I can trace, the code is implemented in parallel_apply.py
[Edit: Paste the code here of easy reference]
def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
r"""Applies each `module` in :attr:`modules` in parallel on arguments
contained in :attr:`inputs` (positional) and :attr:`kwargs_tup` (keyword)
on each of :attr:`devices`.
Args:
modules (Module): modules to be parallelized
inputs (tensor): inputs to the modules
devices (list of int or torch.device): CUDA devices
:attr:`modules`, :attr:`inputs`, :attr:`kwargs_tup` (if given), and
:attr:`devices` (if given) should all have same length. Moreover, each
element of :attr:`inputs` can either be a single object as the only argument
to a module, or a collection of positional arguments.
"""
assert len(modules) == len(inputs)
if kwargs_tup is not None:
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = list(map(lambda x: _get_device_index(x, True), devices))
lock = threading.Lock()
results = {}
grad_enabled = torch.is_grad_enabled()
def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs)
with lock:
results[i] = output
except Exception as e:
with lock:
results[i] = e
if len(modules) > 1:
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])
outputs =
for i in range(len(inputs)):
output = results[i]
if isinstance(output, Exception):
raise output
outputs.append(output)
return outputs
modules
are modules to be paralleled.
inputs
are tensors to the modules
devices
are CUDA devices
results
andoutput
store the final result
_worker()
is the main function to be run by thread
edited Nov 29 at 1:41
answered Nov 28 at 16:17
yoonghm
1,042917
1,042917
1
this does not answer the question. it's barely a "link only" answer
– Shai
Nov 28 at 22:06
add a comment |
1
this does not answer the question. it's barely a "link only" answer
– Shai
Nov 28 at 22:06
1
1
this does not answer the question. it's barely a "link only" answer
– Shai
Nov 28 at 22:06
this does not answer the question. it's barely a "link only" answer
– Shai
Nov 28 at 22:06
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53375422%2fhow-pytorchs-parallel-method-and-distributed-method-works%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
1
It might actually be better to ask on pytorch forums.
– Umang Gupta
Nov 22 at 20:06
My question on the forum: discuss.pytorch.org/t/…
– fantasticfears
Nov 24 at 19:08