Save image from RDD Apache Spark system
I want to retrieve the images that I have stored on my RDD system after I map them.
I created a simple Spark Session on my main.py
file which calls the function preprocess_spark that returns an array of tuples named samples
. These tuples are in the (slide_num, image)
format. Image is an np.array
that will be converted to an image in the save_jpeg_help
function.
When I open the Apache Spark WEB UI I see that it has a job corresponding to the line:
rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))
but when it finishes nothing is ever saved on my save_dir
directory.
Any idea what I'm doing wrong?
Kind regards
main.py
spark = (SparkSession.builder
.appName("Oncofinder -- Preprocessing")
.getOrCreate())
samples = preprocess_spark(spark, [1])
if save_jpegs: #SET TO TRUE
save_rdd_2_jpeg(samples, './data/images')
def save_rdd_2_jpeg(rdd, save_dir):
rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))
def save_nonlabelled_sample_2_jpeg(sample, save_dir):
slide_num, img_value = sample
filename = '{slide_num}_{hash}.jpeg'.format(
slide_num=slide_num, hash=np.random.randint(1e4))
filepath = os.path.join(save_dir, filename)
save_jpeg_help(img_value, filepath)
def save_jpeg_help(img_value, filepath):
dir = os.path.dirname(filepath)
os.makedirs(dir, exist_ok=True)
img = Image.fromarray(img_value.astype(np.uint8), 'RGB')
img.save(filepath)
def preprocess_spark(spark, slide_nums, folder="data", training=False, tile_size=1024, overlap=0,
tissue_threshold=0.9, sample_size=256, grayscale=False, normalize_stains=True,
num_partitions=20000):
slides = (spark.sparkContext
.parallelize(slide_nums)
.filter(lambda slide: open_slide(slide, folder, training) is not None))
tile_indices = (slides.flatMap(
lambda slide: process_slide(slide, folder, training, tile_size, overlap)))
tile_indices = tile_indices.repartition(num_partitions)
tile_indices.cache()
tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))
filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))
samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))
if normalize_stains:
samples = samples.map(lambda sample: normalize_staining(sample))
return samples
EDIT: I'm using
PYSPARK_PYTHON=python3 spark-submit --master spark://127.0.1.1:7077 spark_preprocessing.py
to run the application. It seems that after the foreach
action, nothing else happens. Is there any reason for that?
python apache-spark pyspark
add a comment |
I want to retrieve the images that I have stored on my RDD system after I map them.
I created a simple Spark Session on my main.py
file which calls the function preprocess_spark that returns an array of tuples named samples
. These tuples are in the (slide_num, image)
format. Image is an np.array
that will be converted to an image in the save_jpeg_help
function.
When I open the Apache Spark WEB UI I see that it has a job corresponding to the line:
rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))
but when it finishes nothing is ever saved on my save_dir
directory.
Any idea what I'm doing wrong?
Kind regards
main.py
spark = (SparkSession.builder
.appName("Oncofinder -- Preprocessing")
.getOrCreate())
samples = preprocess_spark(spark, [1])
if save_jpegs: #SET TO TRUE
save_rdd_2_jpeg(samples, './data/images')
def save_rdd_2_jpeg(rdd, save_dir):
rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))
def save_nonlabelled_sample_2_jpeg(sample, save_dir):
slide_num, img_value = sample
filename = '{slide_num}_{hash}.jpeg'.format(
slide_num=slide_num, hash=np.random.randint(1e4))
filepath = os.path.join(save_dir, filename)
save_jpeg_help(img_value, filepath)
def save_jpeg_help(img_value, filepath):
dir = os.path.dirname(filepath)
os.makedirs(dir, exist_ok=True)
img = Image.fromarray(img_value.astype(np.uint8), 'RGB')
img.save(filepath)
def preprocess_spark(spark, slide_nums, folder="data", training=False, tile_size=1024, overlap=0,
tissue_threshold=0.9, sample_size=256, grayscale=False, normalize_stains=True,
num_partitions=20000):
slides = (spark.sparkContext
.parallelize(slide_nums)
.filter(lambda slide: open_slide(slide, folder, training) is not None))
tile_indices = (slides.flatMap(
lambda slide: process_slide(slide, folder, training, tile_size, overlap)))
tile_indices = tile_indices.repartition(num_partitions)
tile_indices.cache()
tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))
filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))
samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))
if normalize_stains:
samples = samples.map(lambda sample: normalize_staining(sample))
return samples
EDIT: I'm using
PYSPARK_PYTHON=python3 spark-submit --master spark://127.0.1.1:7077 spark_preprocessing.py
to run the application. It seems that after the foreach
action, nothing else happens. Is there any reason for that?
python apache-spark pyspark
1
Are you running this on a single node or multiple nodes? Are you using a relative path or an absolute path?
– ayplam
Nov 28 '18 at 7:59
1
Given that you're running this on multiple nodes, you definitely cannot just callimg.save(filepath)
and expect it to work like it would in normal Python execution. Lord only knows where on your cluster your image files are actually ending up. Instead, you'll have tocollect
the image arrays on the driver node and only then runImage.fromarray
andimg.save
on those arrays.
– tel
Dec 8 '18 at 4:04
I triedrdd.collect().foreach(...)
instead ofrdd.foreach(...)
but I getlist object has not attribute foreach
– Luís Costa
Dec 8 '18 at 6:29
rdd.collect()
will return a standard Pythonlist
. You would then have to iterate over the contents of that list and call your image saving function on each element. See my answer below for more details.
– tel
Dec 9 '18 at 2:15
add a comment |
I want to retrieve the images that I have stored on my RDD system after I map them.
I created a simple Spark Session on my main.py
file which calls the function preprocess_spark that returns an array of tuples named samples
. These tuples are in the (slide_num, image)
format. Image is an np.array
that will be converted to an image in the save_jpeg_help
function.
When I open the Apache Spark WEB UI I see that it has a job corresponding to the line:
rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))
but when it finishes nothing is ever saved on my save_dir
directory.
Any idea what I'm doing wrong?
Kind regards
main.py
spark = (SparkSession.builder
.appName("Oncofinder -- Preprocessing")
.getOrCreate())
samples = preprocess_spark(spark, [1])
if save_jpegs: #SET TO TRUE
save_rdd_2_jpeg(samples, './data/images')
def save_rdd_2_jpeg(rdd, save_dir):
rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))
def save_nonlabelled_sample_2_jpeg(sample, save_dir):
slide_num, img_value = sample
filename = '{slide_num}_{hash}.jpeg'.format(
slide_num=slide_num, hash=np.random.randint(1e4))
filepath = os.path.join(save_dir, filename)
save_jpeg_help(img_value, filepath)
def save_jpeg_help(img_value, filepath):
dir = os.path.dirname(filepath)
os.makedirs(dir, exist_ok=True)
img = Image.fromarray(img_value.astype(np.uint8), 'RGB')
img.save(filepath)
def preprocess_spark(spark, slide_nums, folder="data", training=False, tile_size=1024, overlap=0,
tissue_threshold=0.9, sample_size=256, grayscale=False, normalize_stains=True,
num_partitions=20000):
slides = (spark.sparkContext
.parallelize(slide_nums)
.filter(lambda slide: open_slide(slide, folder, training) is not None))
tile_indices = (slides.flatMap(
lambda slide: process_slide(slide, folder, training, tile_size, overlap)))
tile_indices = tile_indices.repartition(num_partitions)
tile_indices.cache()
tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))
filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))
samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))
if normalize_stains:
samples = samples.map(lambda sample: normalize_staining(sample))
return samples
EDIT: I'm using
PYSPARK_PYTHON=python3 spark-submit --master spark://127.0.1.1:7077 spark_preprocessing.py
to run the application. It seems that after the foreach
action, nothing else happens. Is there any reason for that?
python apache-spark pyspark
I want to retrieve the images that I have stored on my RDD system after I map them.
I created a simple Spark Session on my main.py
file which calls the function preprocess_spark that returns an array of tuples named samples
. These tuples are in the (slide_num, image)
format. Image is an np.array
that will be converted to an image in the save_jpeg_help
function.
When I open the Apache Spark WEB UI I see that it has a job corresponding to the line:
rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))
but when it finishes nothing is ever saved on my save_dir
directory.
Any idea what I'm doing wrong?
Kind regards
main.py
spark = (SparkSession.builder
.appName("Oncofinder -- Preprocessing")
.getOrCreate())
samples = preprocess_spark(spark, [1])
if save_jpegs: #SET TO TRUE
save_rdd_2_jpeg(samples, './data/images')
def save_rdd_2_jpeg(rdd, save_dir):
rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))
def save_nonlabelled_sample_2_jpeg(sample, save_dir):
slide_num, img_value = sample
filename = '{slide_num}_{hash}.jpeg'.format(
slide_num=slide_num, hash=np.random.randint(1e4))
filepath = os.path.join(save_dir, filename)
save_jpeg_help(img_value, filepath)
def save_jpeg_help(img_value, filepath):
dir = os.path.dirname(filepath)
os.makedirs(dir, exist_ok=True)
img = Image.fromarray(img_value.astype(np.uint8), 'RGB')
img.save(filepath)
def preprocess_spark(spark, slide_nums, folder="data", training=False, tile_size=1024, overlap=0,
tissue_threshold=0.9, sample_size=256, grayscale=False, normalize_stains=True,
num_partitions=20000):
slides = (spark.sparkContext
.parallelize(slide_nums)
.filter(lambda slide: open_slide(slide, folder, training) is not None))
tile_indices = (slides.flatMap(
lambda slide: process_slide(slide, folder, training, tile_size, overlap)))
tile_indices = tile_indices.repartition(num_partitions)
tile_indices.cache()
tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))
filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))
samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))
if normalize_stains:
samples = samples.map(lambda sample: normalize_staining(sample))
return samples
EDIT: I'm using
PYSPARK_PYTHON=python3 spark-submit --master spark://127.0.1.1:7077 spark_preprocessing.py
to run the application. It seems that after the foreach
action, nothing else happens. Is there any reason for that?
python apache-spark pyspark
python apache-spark pyspark
edited Dec 9 '18 at 2:10
tel
7,41621431
7,41621431
asked Nov 28 '18 at 2:04
Luís CostaLuís Costa
335219
335219
1
Are you running this on a single node or multiple nodes? Are you using a relative path or an absolute path?
– ayplam
Nov 28 '18 at 7:59
1
Given that you're running this on multiple nodes, you definitely cannot just callimg.save(filepath)
and expect it to work like it would in normal Python execution. Lord only knows where on your cluster your image files are actually ending up. Instead, you'll have tocollect
the image arrays on the driver node and only then runImage.fromarray
andimg.save
on those arrays.
– tel
Dec 8 '18 at 4:04
I triedrdd.collect().foreach(...)
instead ofrdd.foreach(...)
but I getlist object has not attribute foreach
– Luís Costa
Dec 8 '18 at 6:29
rdd.collect()
will return a standard Pythonlist
. You would then have to iterate over the contents of that list and call your image saving function on each element. See my answer below for more details.
– tel
Dec 9 '18 at 2:15
add a comment |
1
Are you running this on a single node or multiple nodes? Are you using a relative path or an absolute path?
– ayplam
Nov 28 '18 at 7:59
1
Given that you're running this on multiple nodes, you definitely cannot just callimg.save(filepath)
and expect it to work like it would in normal Python execution. Lord only knows where on your cluster your image files are actually ending up. Instead, you'll have tocollect
the image arrays on the driver node and only then runImage.fromarray
andimg.save
on those arrays.
– tel
Dec 8 '18 at 4:04
I triedrdd.collect().foreach(...)
instead ofrdd.foreach(...)
but I getlist object has not attribute foreach
– Luís Costa
Dec 8 '18 at 6:29
rdd.collect()
will return a standard Pythonlist
. You would then have to iterate over the contents of that list and call your image saving function on each element. See my answer below for more details.
– tel
Dec 9 '18 at 2:15
1
1
Are you running this on a single node or multiple nodes? Are you using a relative path or an absolute path?
– ayplam
Nov 28 '18 at 7:59
Are you running this on a single node or multiple nodes? Are you using a relative path or an absolute path?
– ayplam
Nov 28 '18 at 7:59
1
1
Given that you're running this on multiple nodes, you definitely cannot just call
img.save(filepath)
and expect it to work like it would in normal Python execution. Lord only knows where on your cluster your image files are actually ending up. Instead, you'll have to collect
the image arrays on the driver node and only then run Image.fromarray
and img.save
on those arrays.– tel
Dec 8 '18 at 4:04
Given that you're running this on multiple nodes, you definitely cannot just call
img.save(filepath)
and expect it to work like it would in normal Python execution. Lord only knows where on your cluster your image files are actually ending up. Instead, you'll have to collect
the image arrays on the driver node and only then run Image.fromarray
and img.save
on those arrays.– tel
Dec 8 '18 at 4:04
I tried
rdd.collect().foreach(...)
instead of rdd.foreach(...)
but I get list object has not attribute foreach
– Luís Costa
Dec 8 '18 at 6:29
I tried
rdd.collect().foreach(...)
instead of rdd.foreach(...)
but I get list object has not attribute foreach
– Luís Costa
Dec 8 '18 at 6:29
rdd.collect()
will return a standard Python list
. You would then have to iterate over the contents of that list and call your image saving function on each element. See my answer below for more details.– tel
Dec 9 '18 at 2:15
rdd.collect()
will return a standard Python list
. You would then have to iterate over the contents of that list and call your image saving function on each element. See my answer below for more details.– tel
Dec 9 '18 at 2:15
add a comment |
1 Answer
1
active
oldest
votes
You can fix the issue you're having if you collect
all of your samples on to the driver node before you try to save them. If you redefine save_rdd_2_jpeg
as:
def save_rdd_2_jpeg(rdd, save_dir):
for sample in rdd.collect():
save_nonlabelled_sample_2_jpeg(sample, save_dir)
then everything should work.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53511027%2fsave-image-from-rdd-apache-spark-system%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You can fix the issue you're having if you collect
all of your samples on to the driver node before you try to save them. If you redefine save_rdd_2_jpeg
as:
def save_rdd_2_jpeg(rdd, save_dir):
for sample in rdd.collect():
save_nonlabelled_sample_2_jpeg(sample, save_dir)
then everything should work.
add a comment |
You can fix the issue you're having if you collect
all of your samples on to the driver node before you try to save them. If you redefine save_rdd_2_jpeg
as:
def save_rdd_2_jpeg(rdd, save_dir):
for sample in rdd.collect():
save_nonlabelled_sample_2_jpeg(sample, save_dir)
then everything should work.
add a comment |
You can fix the issue you're having if you collect
all of your samples on to the driver node before you try to save them. If you redefine save_rdd_2_jpeg
as:
def save_rdd_2_jpeg(rdd, save_dir):
for sample in rdd.collect():
save_nonlabelled_sample_2_jpeg(sample, save_dir)
then everything should work.
You can fix the issue you're having if you collect
all of your samples on to the driver node before you try to save them. If you redefine save_rdd_2_jpeg
as:
def save_rdd_2_jpeg(rdd, save_dir):
for sample in rdd.collect():
save_nonlabelled_sample_2_jpeg(sample, save_dir)
then everything should work.
edited Dec 9 '18 at 2:13
answered Dec 9 '18 at 2:07
teltel
7,41621431
7,41621431
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53511027%2fsave-image-from-rdd-apache-spark-system%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
1
Are you running this on a single node or multiple nodes? Are you using a relative path or an absolute path?
– ayplam
Nov 28 '18 at 7:59
1
Given that you're running this on multiple nodes, you definitely cannot just call
img.save(filepath)
and expect it to work like it would in normal Python execution. Lord only knows where on your cluster your image files are actually ending up. Instead, you'll have tocollect
the image arrays on the driver node and only then runImage.fromarray
andimg.save
on those arrays.– tel
Dec 8 '18 at 4:04
I tried
rdd.collect().foreach(...)
instead ofrdd.foreach(...)
but I getlist object has not attribute foreach
– Luís Costa
Dec 8 '18 at 6:29
rdd.collect()
will return a standard Pythonlist
. You would then have to iterate over the contents of that list and call your image saving function on each element. See my answer below for more details.– tel
Dec 9 '18 at 2:15