How to transform structured streams with PySpark?












2














This seems like it should be obvious, but in reviewing the docs and examples, I'm not sure I can find a way to take a structured stream and transform using PySpark.



For example:



from pyspark.sql import SparkSession

spark = (
SparkSession
.builder
.appName('StreamingWordCount')
.getOrCreate()
)

raw_records = (
spark
.readStream
.format('socket')
.option('host', 'localhost')
.option('port', 9999)
.load()
)

# I realize there's a SQL function for upper-case, just illustrating a sample
# use of an arbitrary map function
records = raw_records.rdd.map(lambda w: w.upper()).toDF()

counts = (
records
.groupBy(records.value)
.count()
)

query = (
counts
.writeStream
.outputMode('complete')
.format('console')
.start()
)
query.awaitTermination()


This will throw the following exception:



Queries with streaming sources must be executed with writeStream.start


However, if I remove the call to rdd.map(...).toDF() things seem to work fine.



Seems as though the call to rdd.map branched execution from the streaming context and causes Spark to warn that it was never started?



Is there a "right" way to apply map or mapPartition style transformations using Structured Streaming and PySpark?










share|improve this question





























    2














    This seems like it should be obvious, but in reviewing the docs and examples, I'm not sure I can find a way to take a structured stream and transform using PySpark.



    For example:



    from pyspark.sql import SparkSession

    spark = (
    SparkSession
    .builder
    .appName('StreamingWordCount')
    .getOrCreate()
    )

    raw_records = (
    spark
    .readStream
    .format('socket')
    .option('host', 'localhost')
    .option('port', 9999)
    .load()
    )

    # I realize there's a SQL function for upper-case, just illustrating a sample
    # use of an arbitrary map function
    records = raw_records.rdd.map(lambda w: w.upper()).toDF()

    counts = (
    records
    .groupBy(records.value)
    .count()
    )

    query = (
    counts
    .writeStream
    .outputMode('complete')
    .format('console')
    .start()
    )
    query.awaitTermination()


    This will throw the following exception:



    Queries with streaming sources must be executed with writeStream.start


    However, if I remove the call to rdd.map(...).toDF() things seem to work fine.



    Seems as though the call to rdd.map branched execution from the streaming context and causes Spark to warn that it was never started?



    Is there a "right" way to apply map or mapPartition style transformations using Structured Streaming and PySpark?










    share|improve this question



























      2












      2








      2


      1





      This seems like it should be obvious, but in reviewing the docs and examples, I'm not sure I can find a way to take a structured stream and transform using PySpark.



      For example:



      from pyspark.sql import SparkSession

      spark = (
      SparkSession
      .builder
      .appName('StreamingWordCount')
      .getOrCreate()
      )

      raw_records = (
      spark
      .readStream
      .format('socket')
      .option('host', 'localhost')
      .option('port', 9999)
      .load()
      )

      # I realize there's a SQL function for upper-case, just illustrating a sample
      # use of an arbitrary map function
      records = raw_records.rdd.map(lambda w: w.upper()).toDF()

      counts = (
      records
      .groupBy(records.value)
      .count()
      )

      query = (
      counts
      .writeStream
      .outputMode('complete')
      .format('console')
      .start()
      )
      query.awaitTermination()


      This will throw the following exception:



      Queries with streaming sources must be executed with writeStream.start


      However, if I remove the call to rdd.map(...).toDF() things seem to work fine.



      Seems as though the call to rdd.map branched execution from the streaming context and causes Spark to warn that it was never started?



      Is there a "right" way to apply map or mapPartition style transformations using Structured Streaming and PySpark?










      share|improve this question















      This seems like it should be obvious, but in reviewing the docs and examples, I'm not sure I can find a way to take a structured stream and transform using PySpark.



      For example:



      from pyspark.sql import SparkSession

      spark = (
      SparkSession
      .builder
      .appName('StreamingWordCount')
      .getOrCreate()
      )

      raw_records = (
      spark
      .readStream
      .format('socket')
      .option('host', 'localhost')
      .option('port', 9999)
      .load()
      )

      # I realize there's a SQL function for upper-case, just illustrating a sample
      # use of an arbitrary map function
      records = raw_records.rdd.map(lambda w: w.upper()).toDF()

      counts = (
      records
      .groupBy(records.value)
      .count()
      )

      query = (
      counts
      .writeStream
      .outputMode('complete')
      .format('console')
      .start()
      )
      query.awaitTermination()


      This will throw the following exception:



      Queries with streaming sources must be executed with writeStream.start


      However, if I remove the call to rdd.map(...).toDF() things seem to work fine.



      Seems as though the call to rdd.map branched execution from the streaming context and causes Spark to warn that it was never started?



      Is there a "right" way to apply map or mapPartition style transformations using Structured Streaming and PySpark?







      apache-spark pyspark spark-structured-streaming






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Oct 28 '18 at 11:33









      user6910411

      32.8k86995




      32.8k86995










      asked Jul 25 '18 at 17:56









      Mike Sukmanowsky

      5532820




      5532820
























          1 Answer
          1






          active

          oldest

          votes


















          4














          Every transformation that is applied in Structured Streaming has to be fully contained in Dataset world - in case of PySpark it means you can use only DataFrame or SQL and conversion to RDD (or DStream or local collections) are not supported.



          If you want to use plain Python code you have to use UserDefinedFunction.



          from pyspark.sql.functions import udf

          @udf
          def to_upper(s)
          return s.upper()

          raw_records.select(to_upper("value"))


          See also Spark Structured Streaming and Spark-Ml Regression






          share|improve this answer























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f51525042%2fhow-to-transform-structured-streams-with-pyspark%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            4














            Every transformation that is applied in Structured Streaming has to be fully contained in Dataset world - in case of PySpark it means you can use only DataFrame or SQL and conversion to RDD (or DStream or local collections) are not supported.



            If you want to use plain Python code you have to use UserDefinedFunction.



            from pyspark.sql.functions import udf

            @udf
            def to_upper(s)
            return s.upper()

            raw_records.select(to_upper("value"))


            See also Spark Structured Streaming and Spark-Ml Regression






            share|improve this answer




























              4














              Every transformation that is applied in Structured Streaming has to be fully contained in Dataset world - in case of PySpark it means you can use only DataFrame or SQL and conversion to RDD (or DStream or local collections) are not supported.



              If you want to use plain Python code you have to use UserDefinedFunction.



              from pyspark.sql.functions import udf

              @udf
              def to_upper(s)
              return s.upper()

              raw_records.select(to_upper("value"))


              See also Spark Structured Streaming and Spark-Ml Regression






              share|improve this answer


























                4












                4








                4






                Every transformation that is applied in Structured Streaming has to be fully contained in Dataset world - in case of PySpark it means you can use only DataFrame or SQL and conversion to RDD (or DStream or local collections) are not supported.



                If you want to use plain Python code you have to use UserDefinedFunction.



                from pyspark.sql.functions import udf

                @udf
                def to_upper(s)
                return s.upper()

                raw_records.select(to_upper("value"))


                See also Spark Structured Streaming and Spark-Ml Regression






                share|improve this answer














                Every transformation that is applied in Structured Streaming has to be fully contained in Dataset world - in case of PySpark it means you can use only DataFrame or SQL and conversion to RDD (or DStream or local collections) are not supported.



                If you want to use plain Python code you have to use UserDefinedFunction.



                from pyspark.sql.functions import udf

                @udf
                def to_upper(s)
                return s.upper()

                raw_records.select(to_upper("value"))


                See also Spark Structured Streaming and Spark-Ml Regression







                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Nov 23 '18 at 17:22









                user6910411

                32.8k86995




                32.8k86995










                answered Jul 25 '18 at 21:36









                user10135885

                561




                561






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.





                    Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


                    Please pay close attention to the following guidance:


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f51525042%2fhow-to-transform-structured-streams-with-pyspark%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    A CLEAN and SIMPLE way to add appendices to Table of Contents and bookmarks

                    Calculate evaluation metrics using cross_val_predict sklearn

                    Insert data from modal to MySQL (multiple modal on website)