Cloud Dataflow写入BigQuery Python错误
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Cloud Dataflow写入BigQuery Python错误,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3327字,纯文字阅读大概需要5分钟。
内容图文
![Cloud Dataflow写入BigQuery Python错误](/upload/InfoBanner/zyjiaocheng/735/664de3fdd1ea4f27974b39d62d8987de.jpg)
我正在编写一个简单的Beam作业来将数据从GCS存储桶复制到BigQuery.代码如下所示:
from apache_beam.options.pipeline_options import GoogleCloudOptions
import apache_beam as beam
pipeline_options = GoogleCloudOptions(flags=sys.argv[1:])
pipeline_options.project = PROJECT_ID
pipeline_options.region = 'us-west1'
pipeline_options.job_name = JOB_NAME
pipeline_options.staging_location = BUCKET + '/binaries'
pipeline_options.temp_location = BUCKET + '/temp'
schema = 'id:INTEGER,region:STRING,population:INTEGER,sex:STRING,age:INTEGER,education:STRING,income:FLOAT,statusquo:FLOAT,vote:STRING'
p = (beam.Pipeline(options = pipeline_options)
| 'ReadFromGCS' >> beam.io.textio.ReadFromText('Chile.csv')
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('project:tmp.dummy', schema = schema))
我们在项目项目中写入tmp.dummy表的位置.这导致以下堆栈跟踪:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 151, in _run_module_as_main
mod_name, loader, code, fname = _get_module_details(mod_name)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 101, in _get_module_details
loader = get_loader(mod_name)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pkgutil.py", line 464, in get_loader
return find_loader(fullname)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pkgutil.py", line 474, in find_loader
for importer in iter_importers(fullname):
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pkgutil.py", line 430, in iter_importers
__import__(pkg)
File "WriteToBigQuery.py", line 49, in <module>
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(str(PROJECT_ID + ':' + pipeline_options.write_file), schema = schema))
File "/Users/mayansalama/Documents/GCP/gcloud_env/lib/python2.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1337, in __init__
self.table_reference = _parse_table_reference(table, dataset, project)
File "/Users/mayansalama/Documents/GCP/gcloud_env/lib/python2.7/site-packages/apache_beam/io/gcp/bigquery.py", line 309, in _parse_table_reference
if isinstance(table, bigquery.TableReference):
AttributeError: 'module' object has no attribute 'TableReference'
看起来某些导入在某处出错了;是否可能是因为使用了GoogleCloudOptions管道选项?
解决方法:
我做了一些测试并且无法重现您的问题,数据集是否已经存在?以下代码段为我工作(我使用答案更好地格式化):
import apache_beam as beam
import sys
PROJECT='PROJECT_ID'
BUCKET='BUCKET_NAME'
schema = 'id:INTEGER,region:STRING'
class Split(beam.DoFn):
def process(self, element):
id, region = element.split(",")
return [{
'id': int(id),
'region': region,
}]
def run():
argv = [
'--project={0}'.format(PROJECT),
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
p = beam.Pipeline(argv=argv)
(p
| 'ReadFromGCS' >> beam.io.textio.ReadFromText('gs://{0}/staging/dummy.csv'.format(BUCKET))
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:test.dummy'.format(PROJECT), schema=schema)
)
p.run()
if __name__ == '__main__':
run()
其中dummy.csv包含:
$cat dummy.csv
1,us-central1
2,europe-west1
和BigQuery中的输出是:
使用了一些相关的依赖项:
apache-beam==2.4.0
google-cloud-bigquery==0.25.0
google-cloud-dataflow==2.4.0
内容总结
以上是互联网集市为您收集整理的Cloud Dataflow写入BigQuery Python错误全部内容,希望文章能够帮你解决Cloud Dataflow写入BigQuery Python错误所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。