Save image from RDD Apache Spark system












0















I want to retrieve the images that I have stored on my RDD system after I map them.



I created a simple Spark Session on my main.py file which calls the function preprocess_spark that returns an array of tuples named samples. These tuples are in the (slide_num, image) format. Image is an np.array that will be converted to an image in the save_jpeg_help function.



When I open the Apache Spark WEB UI I see that it has a job corresponding to the line:



rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))


but when it finishes nothing is ever saved on my save_dir directory.



Any idea what I'm doing wrong?



Kind regards



main.py



spark = (SparkSession.builder
.appName("Oncofinder -- Preprocessing")
.getOrCreate())

samples = preprocess_spark(spark, [1])

if save_jpegs: #SET TO TRUE
save_rdd_2_jpeg(samples, './data/images')


def save_rdd_2_jpeg(rdd, save_dir):
rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))


def save_nonlabelled_sample_2_jpeg(sample, save_dir):
slide_num, img_value = sample
filename = '{slide_num}_{hash}.jpeg'.format(
slide_num=slide_num, hash=np.random.randint(1e4))
filepath = os.path.join(save_dir, filename)
save_jpeg_help(img_value, filepath)

def save_jpeg_help(img_value, filepath):
dir = os.path.dirname(filepath)
os.makedirs(dir, exist_ok=True)
img = Image.fromarray(img_value.astype(np.uint8), 'RGB')
img.save(filepath)


def preprocess_spark(spark, slide_nums, folder="data", training=False, tile_size=1024, overlap=0,
tissue_threshold=0.9, sample_size=256, grayscale=False, normalize_stains=True,
num_partitions=20000):

slides = (spark.sparkContext
.parallelize(slide_nums)
.filter(lambda slide: open_slide(slide, folder, training) is not None))
tile_indices = (slides.flatMap(
lambda slide: process_slide(slide, folder, training, tile_size, overlap)))
tile_indices = tile_indices.repartition(num_partitions)
tile_indices.cache()

tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))
filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))
samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))
if normalize_stains:
samples = samples.map(lambda sample: normalize_staining(sample))

return samples


EDIT: I'm using



PYSPARK_PYTHON=python3 spark-submit --master spark://127.0.1.1:7077 spark_preprocessing.py



to run the application. It seems that after the foreach action, nothing else happens. Is there any reason for that?










share|improve this question




















  • 1





    Are you running this on a single node or multiple nodes? Are you using a relative path or an absolute path?

    – ayplam
    Nov 28 '18 at 7:59






  • 1





    Given that you're running this on multiple nodes, you definitely cannot just call img.save(filepath) and expect it to work like it would in normal Python execution. Lord only knows where on your cluster your image files are actually ending up. Instead, you'll have to collect the image arrays on the driver node and only then run Image.fromarray and img.save on those arrays.

    – tel
    Dec 8 '18 at 4:04











  • I tried rdd.collect().foreach(...) instead of rdd.foreach(...) but I get list object has not attribute foreach

    – Luís Costa
    Dec 8 '18 at 6:29











  • rdd.collect() will return a standard Python list. You would then have to iterate over the contents of that list and call your image saving function on each element. See my answer below for more details.

    – tel
    Dec 9 '18 at 2:15


















0















I want to retrieve the images that I have stored on my RDD system after I map them.



I created a simple Spark Session on my main.py file which calls the function preprocess_spark that returns an array of tuples named samples. These tuples are in the (slide_num, image) format. Image is an np.array that will be converted to an image in the save_jpeg_help function.



When I open the Apache Spark WEB UI I see that it has a job corresponding to the line:



rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))


but when it finishes nothing is ever saved on my save_dir directory.



Any idea what I'm doing wrong?



Kind regards



main.py



spark = (SparkSession.builder
.appName("Oncofinder -- Preprocessing")
.getOrCreate())

samples = preprocess_spark(spark, [1])

if save_jpegs: #SET TO TRUE
save_rdd_2_jpeg(samples, './data/images')


def save_rdd_2_jpeg(rdd, save_dir):
rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))


def save_nonlabelled_sample_2_jpeg(sample, save_dir):
slide_num, img_value = sample
filename = '{slide_num}_{hash}.jpeg'.format(
slide_num=slide_num, hash=np.random.randint(1e4))
filepath = os.path.join(save_dir, filename)
save_jpeg_help(img_value, filepath)

def save_jpeg_help(img_value, filepath):
dir = os.path.dirname(filepath)
os.makedirs(dir, exist_ok=True)
img = Image.fromarray(img_value.astype(np.uint8), 'RGB')
img.save(filepath)


def preprocess_spark(spark, slide_nums, folder="data", training=False, tile_size=1024, overlap=0,
tissue_threshold=0.9, sample_size=256, grayscale=False, normalize_stains=True,
num_partitions=20000):

slides = (spark.sparkContext
.parallelize(slide_nums)
.filter(lambda slide: open_slide(slide, folder, training) is not None))
tile_indices = (slides.flatMap(
lambda slide: process_slide(slide, folder, training, tile_size, overlap)))
tile_indices = tile_indices.repartition(num_partitions)
tile_indices.cache()

tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))
filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))
samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))
if normalize_stains:
samples = samples.map(lambda sample: normalize_staining(sample))

return samples


EDIT: I'm using



PYSPARK_PYTHON=python3 spark-submit --master spark://127.0.1.1:7077 spark_preprocessing.py



to run the application. It seems that after the foreach action, nothing else happens. Is there any reason for that?










share|improve this question




















  • 1





    Are you running this on a single node or multiple nodes? Are you using a relative path or an absolute path?

    – ayplam
    Nov 28 '18 at 7:59






  • 1





    Given that you're running this on multiple nodes, you definitely cannot just call img.save(filepath) and expect it to work like it would in normal Python execution. Lord only knows where on your cluster your image files are actually ending up. Instead, you'll have to collect the image arrays on the driver node and only then run Image.fromarray and img.save on those arrays.

    – tel
    Dec 8 '18 at 4:04











  • I tried rdd.collect().foreach(...) instead of rdd.foreach(...) but I get list object has not attribute foreach

    – Luís Costa
    Dec 8 '18 at 6:29











  • rdd.collect() will return a standard Python list. You would then have to iterate over the contents of that list and call your image saving function on each element. See my answer below for more details.

    – tel
    Dec 9 '18 at 2:15
















0












0








0








I want to retrieve the images that I have stored on my RDD system after I map them.



I created a simple Spark Session on my main.py file which calls the function preprocess_spark that returns an array of tuples named samples. These tuples are in the (slide_num, image) format. Image is an np.array that will be converted to an image in the save_jpeg_help function.



When I open the Apache Spark WEB UI I see that it has a job corresponding to the line:



rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))


but when it finishes nothing is ever saved on my save_dir directory.



Any idea what I'm doing wrong?



Kind regards



main.py



spark = (SparkSession.builder
.appName("Oncofinder -- Preprocessing")
.getOrCreate())

samples = preprocess_spark(spark, [1])

if save_jpegs: #SET TO TRUE
save_rdd_2_jpeg(samples, './data/images')


def save_rdd_2_jpeg(rdd, save_dir):
rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))


def save_nonlabelled_sample_2_jpeg(sample, save_dir):
slide_num, img_value = sample
filename = '{slide_num}_{hash}.jpeg'.format(
slide_num=slide_num, hash=np.random.randint(1e4))
filepath = os.path.join(save_dir, filename)
save_jpeg_help(img_value, filepath)

def save_jpeg_help(img_value, filepath):
dir = os.path.dirname(filepath)
os.makedirs(dir, exist_ok=True)
img = Image.fromarray(img_value.astype(np.uint8), 'RGB')
img.save(filepath)


def preprocess_spark(spark, slide_nums, folder="data", training=False, tile_size=1024, overlap=0,
tissue_threshold=0.9, sample_size=256, grayscale=False, normalize_stains=True,
num_partitions=20000):

slides = (spark.sparkContext
.parallelize(slide_nums)
.filter(lambda slide: open_slide(slide, folder, training) is not None))
tile_indices = (slides.flatMap(
lambda slide: process_slide(slide, folder, training, tile_size, overlap)))
tile_indices = tile_indices.repartition(num_partitions)
tile_indices.cache()

tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))
filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))
samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))
if normalize_stains:
samples = samples.map(lambda sample: normalize_staining(sample))

return samples


EDIT: I'm using



PYSPARK_PYTHON=python3 spark-submit --master spark://127.0.1.1:7077 spark_preprocessing.py



to run the application. It seems that after the foreach action, nothing else happens. Is there any reason for that?










share|improve this question
















I want to retrieve the images that I have stored on my RDD system after I map them.



I created a simple Spark Session on my main.py file which calls the function preprocess_spark that returns an array of tuples named samples. These tuples are in the (slide_num, image) format. Image is an np.array that will be converted to an image in the save_jpeg_help function.



When I open the Apache Spark WEB UI I see that it has a job corresponding to the line:



rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))


but when it finishes nothing is ever saved on my save_dir directory.



Any idea what I'm doing wrong?



Kind regards



main.py



spark = (SparkSession.builder
.appName("Oncofinder -- Preprocessing")
.getOrCreate())

samples = preprocess_spark(spark, [1])

if save_jpegs: #SET TO TRUE
save_rdd_2_jpeg(samples, './data/images')


def save_rdd_2_jpeg(rdd, save_dir):
rdd.foreach(lambda sample_element: save_nonlabelled_sample_2_jpeg(sample_element, save_dir))


def save_nonlabelled_sample_2_jpeg(sample, save_dir):
slide_num, img_value = sample
filename = '{slide_num}_{hash}.jpeg'.format(
slide_num=slide_num, hash=np.random.randint(1e4))
filepath = os.path.join(save_dir, filename)
save_jpeg_help(img_value, filepath)

def save_jpeg_help(img_value, filepath):
dir = os.path.dirname(filepath)
os.makedirs(dir, exist_ok=True)
img = Image.fromarray(img_value.astype(np.uint8), 'RGB')
img.save(filepath)


def preprocess_spark(spark, slide_nums, folder="data", training=False, tile_size=1024, overlap=0,
tissue_threshold=0.9, sample_size=256, grayscale=False, normalize_stains=True,
num_partitions=20000):

slides = (spark.sparkContext
.parallelize(slide_nums)
.filter(lambda slide: open_slide(slide, folder, training) is not None))
tile_indices = (slides.flatMap(
lambda slide: process_slide(slide, folder, training, tile_size, overlap)))
tile_indices = tile_indices.repartition(num_partitions)
tile_indices.cache()

tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))
filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))
samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))
if normalize_stains:
samples = samples.map(lambda sample: normalize_staining(sample))

return samples


EDIT: I'm using



PYSPARK_PYTHON=python3 spark-submit --master spark://127.0.1.1:7077 spark_preprocessing.py



to run the application. It seems that after the foreach action, nothing else happens. Is there any reason for that?







python apache-spark pyspark






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Dec 9 '18 at 2:10









tel

7,41621431




7,41621431










asked Nov 28 '18 at 2:04









Luís CostaLuís Costa

335219




335219








  • 1





    Are you running this on a single node or multiple nodes? Are you using a relative path or an absolute path?

    – ayplam
    Nov 28 '18 at 7:59






  • 1





    Given that you're running this on multiple nodes, you definitely cannot just call img.save(filepath) and expect it to work like it would in normal Python execution. Lord only knows where on your cluster your image files are actually ending up. Instead, you'll have to collect the image arrays on the driver node and only then run Image.fromarray and img.save on those arrays.

    – tel
    Dec 8 '18 at 4:04











  • I tried rdd.collect().foreach(...) instead of rdd.foreach(...) but I get list object has not attribute foreach

    – Luís Costa
    Dec 8 '18 at 6:29











  • rdd.collect() will return a standard Python list. You would then have to iterate over the contents of that list and call your image saving function on each element. See my answer below for more details.

    – tel
    Dec 9 '18 at 2:15
















  • 1





    Are you running this on a single node or multiple nodes? Are you using a relative path or an absolute path?

    – ayplam
    Nov 28 '18 at 7:59






  • 1





    Given that you're running this on multiple nodes, you definitely cannot just call img.save(filepath) and expect it to work like it would in normal Python execution. Lord only knows where on your cluster your image files are actually ending up. Instead, you'll have to collect the image arrays on the driver node and only then run Image.fromarray and img.save on those arrays.

    – tel
    Dec 8 '18 at 4:04











  • I tried rdd.collect().foreach(...) instead of rdd.foreach(...) but I get list object has not attribute foreach

    – Luís Costa
    Dec 8 '18 at 6:29











  • rdd.collect() will return a standard Python list. You would then have to iterate over the contents of that list and call your image saving function on each element. See my answer below for more details.

    – tel
    Dec 9 '18 at 2:15










1




1





Are you running this on a single node or multiple nodes? Are you using a relative path or an absolute path?

– ayplam
Nov 28 '18 at 7:59





Are you running this on a single node or multiple nodes? Are you using a relative path or an absolute path?

– ayplam
Nov 28 '18 at 7:59




1




1





Given that you're running this on multiple nodes, you definitely cannot just call img.save(filepath) and expect it to work like it would in normal Python execution. Lord only knows where on your cluster your image files are actually ending up. Instead, you'll have to collect the image arrays on the driver node and only then run Image.fromarray and img.save on those arrays.

– tel
Dec 8 '18 at 4:04





Given that you're running this on multiple nodes, you definitely cannot just call img.save(filepath) and expect it to work like it would in normal Python execution. Lord only knows where on your cluster your image files are actually ending up. Instead, you'll have to collect the image arrays on the driver node and only then run Image.fromarray and img.save on those arrays.

– tel
Dec 8 '18 at 4:04













I tried rdd.collect().foreach(...) instead of rdd.foreach(...) but I get list object has not attribute foreach

– Luís Costa
Dec 8 '18 at 6:29





I tried rdd.collect().foreach(...) instead of rdd.foreach(...) but I get list object has not attribute foreach

– Luís Costa
Dec 8 '18 at 6:29













rdd.collect() will return a standard Python list. You would then have to iterate over the contents of that list and call your image saving function on each element. See my answer below for more details.

– tel
Dec 9 '18 at 2:15







rdd.collect() will return a standard Python list. You would then have to iterate over the contents of that list and call your image saving function on each element. See my answer below for more details.

– tel
Dec 9 '18 at 2:15














1 Answer
1






active

oldest

votes


















1





+50









You can fix the issue you're having if you collect all of your samples on to the driver node before you try to save them. If you redefine save_rdd_2_jpeg as:



def save_rdd_2_jpeg(rdd, save_dir):
for sample in rdd.collect():
save_nonlabelled_sample_2_jpeg(sample, save_dir)


then everything should work.






share|improve this answer

























    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53511027%2fsave-image-from-rdd-apache-spark-system%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    1





    +50









    You can fix the issue you're having if you collect all of your samples on to the driver node before you try to save them. If you redefine save_rdd_2_jpeg as:



    def save_rdd_2_jpeg(rdd, save_dir):
    for sample in rdd.collect():
    save_nonlabelled_sample_2_jpeg(sample, save_dir)


    then everything should work.






    share|improve this answer






























      1





      +50









      You can fix the issue you're having if you collect all of your samples on to the driver node before you try to save them. If you redefine save_rdd_2_jpeg as:



      def save_rdd_2_jpeg(rdd, save_dir):
      for sample in rdd.collect():
      save_nonlabelled_sample_2_jpeg(sample, save_dir)


      then everything should work.






      share|improve this answer




























        1





        +50







        1





        +50



        1




        +50





        You can fix the issue you're having if you collect all of your samples on to the driver node before you try to save them. If you redefine save_rdd_2_jpeg as:



        def save_rdd_2_jpeg(rdd, save_dir):
        for sample in rdd.collect():
        save_nonlabelled_sample_2_jpeg(sample, save_dir)


        then everything should work.






        share|improve this answer















        You can fix the issue you're having if you collect all of your samples on to the driver node before you try to save them. If you redefine save_rdd_2_jpeg as:



        def save_rdd_2_jpeg(rdd, save_dir):
        for sample in rdd.collect():
        save_nonlabelled_sample_2_jpeg(sample, save_dir)


        then everything should work.







        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited Dec 9 '18 at 2:13

























        answered Dec 9 '18 at 2:07









        teltel

        7,41621431




        7,41621431
































            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53511027%2fsave-image-from-rdd-apache-spark-system%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            A CLEAN and SIMPLE way to add appendices to Table of Contents and bookmarks

            Calculate evaluation metrics using cross_val_predict sklearn

            Insert data from modal to MySQL (multiple modal on website)