Spark - combine filter results from all executors












4














I have 3 executors in my spark streaming job which consumes from Kafka. Executor count depends on partition count in topic. When a message consumed from this topic, I am starting query on Hazelcast. Every executor finds results from some filtering operation on hazelcast and returns duplicated results. Because data statuses are not updated when executor returns the data and other executor finds the same data.



My question is, is there a way to combine all results in only one list which are found by executors during streaming?










share|improve this question
























  • use accumulators...pls share ur code..
    – Taha Naqvi
    Nov 23 at 10:23












  • thx for your comment. I detailed my question. Accumulator is still on the table and I am reading about it.
    – Masay
    Nov 23 at 11:22
















4














I have 3 executors in my spark streaming job which consumes from Kafka. Executor count depends on partition count in topic. When a message consumed from this topic, I am starting query on Hazelcast. Every executor finds results from some filtering operation on hazelcast and returns duplicated results. Because data statuses are not updated when executor returns the data and other executor finds the same data.



My question is, is there a way to combine all results in only one list which are found by executors during streaming?










share|improve this question
























  • use accumulators...pls share ur code..
    – Taha Naqvi
    Nov 23 at 10:23












  • thx for your comment. I detailed my question. Accumulator is still on the table and I am reading about it.
    – Masay
    Nov 23 at 11:22














4












4








4







I have 3 executors in my spark streaming job which consumes from Kafka. Executor count depends on partition count in topic. When a message consumed from this topic, I am starting query on Hazelcast. Every executor finds results from some filtering operation on hazelcast and returns duplicated results. Because data statuses are not updated when executor returns the data and other executor finds the same data.



My question is, is there a way to combine all results in only one list which are found by executors during streaming?










share|improve this question















I have 3 executors in my spark streaming job which consumes from Kafka. Executor count depends on partition count in topic. When a message consumed from this topic, I am starting query on Hazelcast. Every executor finds results from some filtering operation on hazelcast and returns duplicated results. Because data statuses are not updated when executor returns the data and other executor finds the same data.



My question is, is there a way to combine all results in only one list which are found by executors during streaming?







java apache-spark hazelcast






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 23 at 11:21

























asked Nov 23 at 9:00









Masay

55121131




55121131












  • use accumulators...pls share ur code..
    – Taha Naqvi
    Nov 23 at 10:23












  • thx for your comment. I detailed my question. Accumulator is still on the table and I am reading about it.
    – Masay
    Nov 23 at 11:22


















  • use accumulators...pls share ur code..
    – Taha Naqvi
    Nov 23 at 10:23












  • thx for your comment. I detailed my question. Accumulator is still on the table and I am reading about it.
    – Masay
    Nov 23 at 11:22
















use accumulators...pls share ur code..
– Taha Naqvi
Nov 23 at 10:23






use accumulators...pls share ur code..
– Taha Naqvi
Nov 23 at 10:23














thx for your comment. I detailed my question. Accumulator is still on the table and I am reading about it.
– Masay
Nov 23 at 11:22




thx for your comment. I detailed my question. Accumulator is still on the table and I am reading about it.
– Masay
Nov 23 at 11:22












2 Answers
2






active

oldest

votes


















0














Spark Executors are distributed across Cluster, so if you are trying to deduplicate data across cluster. So deduplicating is difficult. you have following options




  1. Use accumulators.- problem here is that accumulators are not consistent when job is running and you may end up reading stale data

  2. Other option is Offload this work to external system. - store your output in some external storage which can deduplicate it. (Probably HBase). efficiency of this storage system becomes key here.


I hope this helps






share|improve this answer





























    0














    To avoid duplicate data read, you need to maintain the offset somewhere, preferred in HBase and everytime you consume the data from Kafka, you read it from HBase and then check the offset for each topic which is already consumed and then start reading and writing it. After each successful write, you must update the offset count.



    Do you think that way it solves the issue?






    share|improve this answer





















      Your Answer






      StackExchange.ifUsing("editor", function () {
      StackExchange.using("externalEditor", function () {
      StackExchange.using("snippets", function () {
      StackExchange.snippets.init();
      });
      });
      }, "code-snippets");

      StackExchange.ready(function() {
      var channelOptions = {
      tags: "".split(" "),
      id: "1"
      };
      initTagRenderer("".split(" "), "".split(" "), channelOptions);

      StackExchange.using("externalEditor", function() {
      // Have to fire editor after snippets, if snippets enabled
      if (StackExchange.settings.snippets.snippetsEnabled) {
      StackExchange.using("snippets", function() {
      createEditor();
      });
      }
      else {
      createEditor();
      }
      });

      function createEditor() {
      StackExchange.prepareEditor({
      heartbeatType: 'answer',
      autoActivateHeartbeat: false,
      convertImagesToLinks: true,
      noModals: true,
      showLowRepImageUploadWarning: true,
      reputationToPostImages: 10,
      bindNavPrevention: true,
      postfix: "",
      imageUploader: {
      brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
      contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
      allowUrls: true
      },
      onDemand: true,
      discardSelector: ".discard-answer"
      ,immediatelyShowMarkdownHelp:true
      });


      }
      });














      draft saved

      draft discarded


















      StackExchange.ready(
      function () {
      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53443456%2fspark-combine-filter-results-from-all-executors%23new-answer', 'question_page');
      }
      );

      Post as a guest















      Required, but never shown

























      2 Answers
      2






      active

      oldest

      votes








      2 Answers
      2






      active

      oldest

      votes









      active

      oldest

      votes






      active

      oldest

      votes









      0














      Spark Executors are distributed across Cluster, so if you are trying to deduplicate data across cluster. So deduplicating is difficult. you have following options




      1. Use accumulators.- problem here is that accumulators are not consistent when job is running and you may end up reading stale data

      2. Other option is Offload this work to external system. - store your output in some external storage which can deduplicate it. (Probably HBase). efficiency of this storage system becomes key here.


      I hope this helps






      share|improve this answer


























        0














        Spark Executors are distributed across Cluster, so if you are trying to deduplicate data across cluster. So deduplicating is difficult. you have following options




        1. Use accumulators.- problem here is that accumulators are not consistent when job is running and you may end up reading stale data

        2. Other option is Offload this work to external system. - store your output in some external storage which can deduplicate it. (Probably HBase). efficiency of this storage system becomes key here.


        I hope this helps






        share|improve this answer
























          0












          0








          0






          Spark Executors are distributed across Cluster, so if you are trying to deduplicate data across cluster. So deduplicating is difficult. you have following options




          1. Use accumulators.- problem here is that accumulators are not consistent when job is running and you may end up reading stale data

          2. Other option is Offload this work to external system. - store your output in some external storage which can deduplicate it. (Probably HBase). efficiency of this storage system becomes key here.


          I hope this helps






          share|improve this answer












          Spark Executors are distributed across Cluster, so if you are trying to deduplicate data across cluster. So deduplicating is difficult. you have following options




          1. Use accumulators.- problem here is that accumulators are not consistent when job is running and you may end up reading stale data

          2. Other option is Offload this work to external system. - store your output in some external storage which can deduplicate it. (Probably HBase). efficiency of this storage system becomes key here.


          I hope this helps







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 28 at 6:47









          Harjeet Kumar

          1562




          1562

























              0














              To avoid duplicate data read, you need to maintain the offset somewhere, preferred in HBase and everytime you consume the data from Kafka, you read it from HBase and then check the offset for each topic which is already consumed and then start reading and writing it. After each successful write, you must update the offset count.



              Do you think that way it solves the issue?






              share|improve this answer


























                0














                To avoid duplicate data read, you need to maintain the offset somewhere, preferred in HBase and everytime you consume the data from Kafka, you read it from HBase and then check the offset for each topic which is already consumed and then start reading and writing it. After each successful write, you must update the offset count.



                Do you think that way it solves the issue?






                share|improve this answer
























                  0












                  0








                  0






                  To avoid duplicate data read, you need to maintain the offset somewhere, preferred in HBase and everytime you consume the data from Kafka, you read it from HBase and then check the offset for each topic which is already consumed and then start reading and writing it. After each successful write, you must update the offset count.



                  Do you think that way it solves the issue?






                  share|improve this answer












                  To avoid duplicate data read, you need to maintain the offset somewhere, preferred in HBase and everytime you consume the data from Kafka, you read it from HBase and then check the offset for each topic which is already consumed and then start reading and writing it. After each successful write, you must update the offset count.



                  Do you think that way it solves the issue?







                  share|improve this answer












                  share|improve this answer



                  share|improve this answer










                  answered Nov 29 at 14:24









                  H Roy

                  11416




                  11416






























                      draft saved

                      draft discarded




















































                      Thanks for contributing an answer to Stack Overflow!


                      • Please be sure to answer the question. Provide details and share your research!

                      But avoid



                      • Asking for help, clarification, or responding to other answers.

                      • Making statements based on opinion; back them up with references or personal experience.


                      To learn more, see our tips on writing great answers.





                      Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


                      Please pay close attention to the following guidance:


                      • Please be sure to answer the question. Provide details and share your research!

                      But avoid



                      • Asking for help, clarification, or responding to other answers.

                      • Making statements based on opinion; back them up with references or personal experience.


                      To learn more, see our tips on writing great answers.




                      draft saved


                      draft discarded














                      StackExchange.ready(
                      function () {
                      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53443456%2fspark-combine-filter-results-from-all-executors%23new-answer', 'question_page');
                      }
                      );

                      Post as a guest















                      Required, but never shown





















































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown

































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown







                      Popular posts from this blog

                      A CLEAN and SIMPLE way to add appendices to Table of Contents and bookmarks

                      Calculate evaluation metrics using cross_val_predict sklearn

                      Insert data from modal to MySQL (multiple modal on website)