Error while using WriteToBigquery in python for Dataflow pipeline. Unicode object has no attribute 'items'












0















My sample data is in json format and looks like:



{
"metadata": {
"action": "insert",
"type": "export",
"version": 1,
"timestamp": "2018-11-23T09:17:59.048-08:00"
},
"data": {
"attr1": 61,
"day": "2018-11-22",
"pin": "2C49956",
"CDP": 0,
"DP": 0,
"VD": 0,
"seo": 0,
"dir": 0,
"other": 0,
"at": 0
}
}


This is in a flat file and the objective is to run a dataflow pipeline in batch mode to insert the data into bigquery table. In one of the transformations where I want to take timestamp from the metadata and add it as a key value pair in the data section, I am getting the error from dataflow saying 'unicode object has no attribute 'items'.



The code looks as below:



import collections
import json
import argparse
import logging
from datetime import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions,
WorkerOptions
from apache_beam.io.gcp import bigquery


# Creating options object
def create_options(argv):
# pipeline options
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'something'
google_cloud_options.job_name = datetime.now().strftime('somename')
google_cloud_options.staging_location = 'some_loc'
google_cloud_options.temp_location = 'another_loc'
options.view_as(StandardOptions).runner = 'DirectRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(WorkerOptions).machine_type = 'n1-standard-1'
return options

class PrepareData(beam.DoFn):
"""
ParDo function to create a dictionary of data for downstream consumption
"""

def process(self, element):
data = json.loads(element)
modified_data = {"action": data["metadata"]["action"], "timestamp": data["metadata"]["timestamp"], "data": data}
return [modified_data]


class FilterInserts(beam.DoFn):
"""
Filter data for inserts
"""

def process(self, element):
if element["action"] == "insert":
element['data']['data']['timestamp'] = element['timestamp']
# for dict in element["data"]["data"]:
# dict["timestamp"] = element["timestamp"]
return element["data"]["data"]


def run_pipe(options, argv):
"""
Creating pipelines
"""
p = beam.Pipeline(options=options)

main_pipe =p | 'PREPARE_DATA' >> beam.io.ReadFromText('/home/Downloads/sample_1') | beam.ParDo(PrepareData())

""" Separating pipes for various actions """
insert_pipe= main_pipe | beam.ParDo(FilterInserts())

"""
Inserts--> sinking to BQ
"""
insert_pipe | 'INSERT' >> beam.io.WriteToBigQuery(
project='some-data-warehouse',
dataset='sample_data',
table='sample',
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED')


p.run()


def main():
"""
Main function to drive the run
:return: errors if any
"""
parser = argparse.ArgumentParser()
args = parser.parse_args()
try:
# create options
opt = create_options(argv=args)
# run pipeline
run_pipe(opt, argv=args)
except Exception as e:
logging.error('Pipeline failed with error : %s', e)
raise Exception('Pipeline failed with error : %s', e)


if __name__ == "__main__":
main()


I am running this on direct runner to test on local but i get the same error even if i change the runner to dataflow-runner.
The error message is:



Exception: ('Pipeline failed with error : %s', AttributeError(u"'unicode' object has no attribute 'items' [while running 'INSERT/WriteToBigQuery']",))


Can anyone help me figure out what's going wrong and how i can fix this?










share|improve this question



























    0















    My sample data is in json format and looks like:



    {
    "metadata": {
    "action": "insert",
    "type": "export",
    "version": 1,
    "timestamp": "2018-11-23T09:17:59.048-08:00"
    },
    "data": {
    "attr1": 61,
    "day": "2018-11-22",
    "pin": "2C49956",
    "CDP": 0,
    "DP": 0,
    "VD": 0,
    "seo": 0,
    "dir": 0,
    "other": 0,
    "at": 0
    }
    }


    This is in a flat file and the objective is to run a dataflow pipeline in batch mode to insert the data into bigquery table. In one of the transformations where I want to take timestamp from the metadata and add it as a key value pair in the data section, I am getting the error from dataflow saying 'unicode object has no attribute 'items'.



    The code looks as below:



    import collections
    import json
    import argparse
    import logging
    from datetime import datetime
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions,
    WorkerOptions
    from apache_beam.io.gcp import bigquery


    # Creating options object
    def create_options(argv):
    # pipeline options
    options = PipelineOptions()
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = 'something'
    google_cloud_options.job_name = datetime.now().strftime('somename')
    google_cloud_options.staging_location = 'some_loc'
    google_cloud_options.temp_location = 'another_loc'
    options.view_as(StandardOptions).runner = 'DirectRunner'
    options.view_as(SetupOptions).save_main_session = True
    options.view_as(WorkerOptions).machine_type = 'n1-standard-1'
    return options

    class PrepareData(beam.DoFn):
    """
    ParDo function to create a dictionary of data for downstream consumption
    """

    def process(self, element):
    data = json.loads(element)
    modified_data = {"action": data["metadata"]["action"], "timestamp": data["metadata"]["timestamp"], "data": data}
    return [modified_data]


    class FilterInserts(beam.DoFn):
    """
    Filter data for inserts
    """

    def process(self, element):
    if element["action"] == "insert":
    element['data']['data']['timestamp'] = element['timestamp']
    # for dict in element["data"]["data"]:
    # dict["timestamp"] = element["timestamp"]
    return element["data"]["data"]


    def run_pipe(options, argv):
    """
    Creating pipelines
    """
    p = beam.Pipeline(options=options)

    main_pipe =p | 'PREPARE_DATA' >> beam.io.ReadFromText('/home/Downloads/sample_1') | beam.ParDo(PrepareData())

    """ Separating pipes for various actions """
    insert_pipe= main_pipe | beam.ParDo(FilterInserts())

    """
    Inserts--> sinking to BQ
    """
    insert_pipe | 'INSERT' >> beam.io.WriteToBigQuery(
    project='some-data-warehouse',
    dataset='sample_data',
    table='sample',
    write_disposition='WRITE_APPEND',
    create_disposition='CREATE_IF_NEEDED')


    p.run()


    def main():
    """
    Main function to drive the run
    :return: errors if any
    """
    parser = argparse.ArgumentParser()
    args = parser.parse_args()
    try:
    # create options
    opt = create_options(argv=args)
    # run pipeline
    run_pipe(opt, argv=args)
    except Exception as e:
    logging.error('Pipeline failed with error : %s', e)
    raise Exception('Pipeline failed with error : %s', e)


    if __name__ == "__main__":
    main()


    I am running this on direct runner to test on local but i get the same error even if i change the runner to dataflow-runner.
    The error message is:



    Exception: ('Pipeline failed with error : %s', AttributeError(u"'unicode' object has no attribute 'items' [while running 'INSERT/WriteToBigQuery']",))


    Can anyone help me figure out what's going wrong and how i can fix this?










    share|improve this question

























      0












      0








      0








      My sample data is in json format and looks like:



      {
      "metadata": {
      "action": "insert",
      "type": "export",
      "version": 1,
      "timestamp": "2018-11-23T09:17:59.048-08:00"
      },
      "data": {
      "attr1": 61,
      "day": "2018-11-22",
      "pin": "2C49956",
      "CDP": 0,
      "DP": 0,
      "VD": 0,
      "seo": 0,
      "dir": 0,
      "other": 0,
      "at": 0
      }
      }


      This is in a flat file and the objective is to run a dataflow pipeline in batch mode to insert the data into bigquery table. In one of the transformations where I want to take timestamp from the metadata and add it as a key value pair in the data section, I am getting the error from dataflow saying 'unicode object has no attribute 'items'.



      The code looks as below:



      import collections
      import json
      import argparse
      import logging
      from datetime import datetime
      import apache_beam as beam
      from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions,
      WorkerOptions
      from apache_beam.io.gcp import bigquery


      # Creating options object
      def create_options(argv):
      # pipeline options
      options = PipelineOptions()
      google_cloud_options = options.view_as(GoogleCloudOptions)
      google_cloud_options.project = 'something'
      google_cloud_options.job_name = datetime.now().strftime('somename')
      google_cloud_options.staging_location = 'some_loc'
      google_cloud_options.temp_location = 'another_loc'
      options.view_as(StandardOptions).runner = 'DirectRunner'
      options.view_as(SetupOptions).save_main_session = True
      options.view_as(WorkerOptions).machine_type = 'n1-standard-1'
      return options

      class PrepareData(beam.DoFn):
      """
      ParDo function to create a dictionary of data for downstream consumption
      """

      def process(self, element):
      data = json.loads(element)
      modified_data = {"action": data["metadata"]["action"], "timestamp": data["metadata"]["timestamp"], "data": data}
      return [modified_data]


      class FilterInserts(beam.DoFn):
      """
      Filter data for inserts
      """

      def process(self, element):
      if element["action"] == "insert":
      element['data']['data']['timestamp'] = element['timestamp']
      # for dict in element["data"]["data"]:
      # dict["timestamp"] = element["timestamp"]
      return element["data"]["data"]


      def run_pipe(options, argv):
      """
      Creating pipelines
      """
      p = beam.Pipeline(options=options)

      main_pipe =p | 'PREPARE_DATA' >> beam.io.ReadFromText('/home/Downloads/sample_1') | beam.ParDo(PrepareData())

      """ Separating pipes for various actions """
      insert_pipe= main_pipe | beam.ParDo(FilterInserts())

      """
      Inserts--> sinking to BQ
      """
      insert_pipe | 'INSERT' >> beam.io.WriteToBigQuery(
      project='some-data-warehouse',
      dataset='sample_data',
      table='sample',
      write_disposition='WRITE_APPEND',
      create_disposition='CREATE_IF_NEEDED')


      p.run()


      def main():
      """
      Main function to drive the run
      :return: errors if any
      """
      parser = argparse.ArgumentParser()
      args = parser.parse_args()
      try:
      # create options
      opt = create_options(argv=args)
      # run pipeline
      run_pipe(opt, argv=args)
      except Exception as e:
      logging.error('Pipeline failed with error : %s', e)
      raise Exception('Pipeline failed with error : %s', e)


      if __name__ == "__main__":
      main()


      I am running this on direct runner to test on local but i get the same error even if i change the runner to dataflow-runner.
      The error message is:



      Exception: ('Pipeline failed with error : %s', AttributeError(u"'unicode' object has no attribute 'items' [while running 'INSERT/WriteToBigQuery']",))


      Can anyone help me figure out what's going wrong and how i can fix this?










      share|improve this question














      My sample data is in json format and looks like:



      {
      "metadata": {
      "action": "insert",
      "type": "export",
      "version": 1,
      "timestamp": "2018-11-23T09:17:59.048-08:00"
      },
      "data": {
      "attr1": 61,
      "day": "2018-11-22",
      "pin": "2C49956",
      "CDP": 0,
      "DP": 0,
      "VD": 0,
      "seo": 0,
      "dir": 0,
      "other": 0,
      "at": 0
      }
      }


      This is in a flat file and the objective is to run a dataflow pipeline in batch mode to insert the data into bigquery table. In one of the transformations where I want to take timestamp from the metadata and add it as a key value pair in the data section, I am getting the error from dataflow saying 'unicode object has no attribute 'items'.



      The code looks as below:



      import collections
      import json
      import argparse
      import logging
      from datetime import datetime
      import apache_beam as beam
      from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions,
      WorkerOptions
      from apache_beam.io.gcp import bigquery


      # Creating options object
      def create_options(argv):
      # pipeline options
      options = PipelineOptions()
      google_cloud_options = options.view_as(GoogleCloudOptions)
      google_cloud_options.project = 'something'
      google_cloud_options.job_name = datetime.now().strftime('somename')
      google_cloud_options.staging_location = 'some_loc'
      google_cloud_options.temp_location = 'another_loc'
      options.view_as(StandardOptions).runner = 'DirectRunner'
      options.view_as(SetupOptions).save_main_session = True
      options.view_as(WorkerOptions).machine_type = 'n1-standard-1'
      return options

      class PrepareData(beam.DoFn):
      """
      ParDo function to create a dictionary of data for downstream consumption
      """

      def process(self, element):
      data = json.loads(element)
      modified_data = {"action": data["metadata"]["action"], "timestamp": data["metadata"]["timestamp"], "data": data}
      return [modified_data]


      class FilterInserts(beam.DoFn):
      """
      Filter data for inserts
      """

      def process(self, element):
      if element["action"] == "insert":
      element['data']['data']['timestamp'] = element['timestamp']
      # for dict in element["data"]["data"]:
      # dict["timestamp"] = element["timestamp"]
      return element["data"]["data"]


      def run_pipe(options, argv):
      """
      Creating pipelines
      """
      p = beam.Pipeline(options=options)

      main_pipe =p | 'PREPARE_DATA' >> beam.io.ReadFromText('/home/Downloads/sample_1') | beam.ParDo(PrepareData())

      """ Separating pipes for various actions """
      insert_pipe= main_pipe | beam.ParDo(FilterInserts())

      """
      Inserts--> sinking to BQ
      """
      insert_pipe | 'INSERT' >> beam.io.WriteToBigQuery(
      project='some-data-warehouse',
      dataset='sample_data',
      table='sample',
      write_disposition='WRITE_APPEND',
      create_disposition='CREATE_IF_NEEDED')


      p.run()


      def main():
      """
      Main function to drive the run
      :return: errors if any
      """
      parser = argparse.ArgumentParser()
      args = parser.parse_args()
      try:
      # create options
      opt = create_options(argv=args)
      # run pipeline
      run_pipe(opt, argv=args)
      except Exception as e:
      logging.error('Pipeline failed with error : %s', e)
      raise Exception('Pipeline failed with error : %s', e)


      if __name__ == "__main__":
      main()


      I am running this on direct runner to test on local but i get the same error even if i change the runner to dataflow-runner.
      The error message is:



      Exception: ('Pipeline failed with error : %s', AttributeError(u"'unicode' object has no attribute 'items' [while running 'INSERT/WriteToBigQuery']",))


      Can anyone help me figure out what's going wrong and how i can fix this?







      python json google-bigquery google-cloud-dataflow apache-beam






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 25 '18 at 1:08









      Utsav ChatterjeeUtsav Chatterjee

      6210




      6210
























          1 Answer
          1






          active

          oldest

          votes


















          1














          By using the following table's schema (You can modify it according to your necessities):



          schema = 'VD:INTEGER,pin:STRING,timestamp:STRING,other:INTEGER,CDP:INTEGER,dir:INTEGER,attr1:INTEGER,seo:INTEGER,day:STRING,DP:INTEGER,at:INTEGER'


          Try the following on your FilterInserts class:



          class FilterInserts(beam.DoFn):
          """
          Filter data for inserts
          """

          def process(self, element):
          if element["action"] == "insert":
          element['data']['data']['timestamp'] = element['timestamp']

          return [{
          'VD': element['data']['data']['VD'],
          'pin': element['data']['data']['pin'],
          'timestamp': element['data']['data']['timestamp'],
          'other': element['data']['data']['other'],
          'CDP': element['data']['data']['CDP'],
          'dir': element['data']['data']['dir'],
          'attr1' : element['data']['data']['attr1'],
          'seo' : element['data']['data']['seo'],
          'day' : element['data']['data']['day'],
          'DP' : element['data']['data']['DP'],
          'at' : element['data']['data']['at'],
          }]


          The issue happens since you need to send a Key Value array to BigQuery, and you were sending a JSON dictionary with unicode strings.



          Hope it helps.






          share|improve this answer























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53463824%2ferror-while-using-writetobigquery-in-python-for-dataflow-pipeline-unicode-objec%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            1














            By using the following table's schema (You can modify it according to your necessities):



            schema = 'VD:INTEGER,pin:STRING,timestamp:STRING,other:INTEGER,CDP:INTEGER,dir:INTEGER,attr1:INTEGER,seo:INTEGER,day:STRING,DP:INTEGER,at:INTEGER'


            Try the following on your FilterInserts class:



            class FilterInserts(beam.DoFn):
            """
            Filter data for inserts
            """

            def process(self, element):
            if element["action"] == "insert":
            element['data']['data']['timestamp'] = element['timestamp']

            return [{
            'VD': element['data']['data']['VD'],
            'pin': element['data']['data']['pin'],
            'timestamp': element['data']['data']['timestamp'],
            'other': element['data']['data']['other'],
            'CDP': element['data']['data']['CDP'],
            'dir': element['data']['data']['dir'],
            'attr1' : element['data']['data']['attr1'],
            'seo' : element['data']['data']['seo'],
            'day' : element['data']['data']['day'],
            'DP' : element['data']['data']['DP'],
            'at' : element['data']['data']['at'],
            }]


            The issue happens since you need to send a Key Value array to BigQuery, and you were sending a JSON dictionary with unicode strings.



            Hope it helps.






            share|improve this answer




























              1














              By using the following table's schema (You can modify it according to your necessities):



              schema = 'VD:INTEGER,pin:STRING,timestamp:STRING,other:INTEGER,CDP:INTEGER,dir:INTEGER,attr1:INTEGER,seo:INTEGER,day:STRING,DP:INTEGER,at:INTEGER'


              Try the following on your FilterInserts class:



              class FilterInserts(beam.DoFn):
              """
              Filter data for inserts
              """

              def process(self, element):
              if element["action"] == "insert":
              element['data']['data']['timestamp'] = element['timestamp']

              return [{
              'VD': element['data']['data']['VD'],
              'pin': element['data']['data']['pin'],
              'timestamp': element['data']['data']['timestamp'],
              'other': element['data']['data']['other'],
              'CDP': element['data']['data']['CDP'],
              'dir': element['data']['data']['dir'],
              'attr1' : element['data']['data']['attr1'],
              'seo' : element['data']['data']['seo'],
              'day' : element['data']['data']['day'],
              'DP' : element['data']['data']['DP'],
              'at' : element['data']['data']['at'],
              }]


              The issue happens since you need to send a Key Value array to BigQuery, and you were sending a JSON dictionary with unicode strings.



              Hope it helps.






              share|improve this answer


























                1












                1








                1







                By using the following table's schema (You can modify it according to your necessities):



                schema = 'VD:INTEGER,pin:STRING,timestamp:STRING,other:INTEGER,CDP:INTEGER,dir:INTEGER,attr1:INTEGER,seo:INTEGER,day:STRING,DP:INTEGER,at:INTEGER'


                Try the following on your FilterInserts class:



                class FilterInserts(beam.DoFn):
                """
                Filter data for inserts
                """

                def process(self, element):
                if element["action"] == "insert":
                element['data']['data']['timestamp'] = element['timestamp']

                return [{
                'VD': element['data']['data']['VD'],
                'pin': element['data']['data']['pin'],
                'timestamp': element['data']['data']['timestamp'],
                'other': element['data']['data']['other'],
                'CDP': element['data']['data']['CDP'],
                'dir': element['data']['data']['dir'],
                'attr1' : element['data']['data']['attr1'],
                'seo' : element['data']['data']['seo'],
                'day' : element['data']['data']['day'],
                'DP' : element['data']['data']['DP'],
                'at' : element['data']['data']['at'],
                }]


                The issue happens since you need to send a Key Value array to BigQuery, and you were sending a JSON dictionary with unicode strings.



                Hope it helps.






                share|improve this answer













                By using the following table's schema (You can modify it according to your necessities):



                schema = 'VD:INTEGER,pin:STRING,timestamp:STRING,other:INTEGER,CDP:INTEGER,dir:INTEGER,attr1:INTEGER,seo:INTEGER,day:STRING,DP:INTEGER,at:INTEGER'


                Try the following on your FilterInserts class:



                class FilterInserts(beam.DoFn):
                """
                Filter data for inserts
                """

                def process(self, element):
                if element["action"] == "insert":
                element['data']['data']['timestamp'] = element['timestamp']

                return [{
                'VD': element['data']['data']['VD'],
                'pin': element['data']['data']['pin'],
                'timestamp': element['data']['data']['timestamp'],
                'other': element['data']['data']['other'],
                'CDP': element['data']['data']['CDP'],
                'dir': element['data']['data']['dir'],
                'attr1' : element['data']['data']['attr1'],
                'seo' : element['data']['data']['seo'],
                'day' : element['data']['data']['day'],
                'DP' : element['data']['data']['DP'],
                'at' : element['data']['data']['at'],
                }]


                The issue happens since you need to send a Key Value array to BigQuery, and you were sending a JSON dictionary with unicode strings.



                Hope it helps.







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 26 '18 at 20:33









                F10F10

                1,2762414




                1,2762414






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53463824%2ferror-while-using-writetobigquery-in-python-for-dataflow-pipeline-unicode-objec%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    A CLEAN and SIMPLE way to add appendices to Table of Contents and bookmarks

                    Calculate evaluation metrics using cross_val_predict sklearn

                    Insert data from modal to MySQL (multiple modal on website)