如何在Python中创建从Pub / Sub到GCS的数据流管道
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了如何在Python中创建从Pub / Sub到GCS的数据流管道,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含12648字,纯文字阅读大概需要19分钟。
内容图文
![如何在Python中创建从Pub / Sub到GCS的数据流管道](/upload/InfoBanner/zyjiaocheng/694/2cd3db759edb40348491f9ab07bfd4fd.jpg)
我想使用Dataflow将数据从发布/订阅移到GCS.
因此,基本上我希望Dataflow在固定的时间量(例如15分钟)内累积一些消息,然后在经过该时间量后将这些数据作为文本文件写入GCS.
我的最终目标是创建一个自定义管道,因此“ Pub / Sub to Cloud Storage”模板对我来说还不够,而且我完全不了解Java,这使我开始使用Python进行调整.
这是到目前为止我所获得的(Apache Beam Python SDK 2.10.0):
import apache_beam as beam
TOPIC_PATH="projects/<my-project>/topics/<my-topic>"
def CombineFn(e):
return "\n".join(e)
o = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=o)
data = ( p | "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
| "Window" >> beam.WindowInto(beam.window.FixedWindows(30))
| "Combine" >> beam.transforms.core.CombineGlobally(CombineFn).without_defaults()
| "Output" >> beam.io.WriteToText("<GCS path or local path>"))
res = p.run()
res.wait_until_finish()
我在本地环境中运行该程序没有问题.
python main.py
它在本地运行,但可以从指定的Pub / Sub主题读取,并且每隔30秒就会按预期方式写入指定的GCS路径.
但是现在的问题是,当我在Google Cloud Platform(即Cloud Dataflow)上运行它时,它不断发出神秘的异常.
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1096: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 148, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 183, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in do_instruction
request.instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 272, in process_bundle
bundle_processor.process_bundle(instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 494, in process_bundle
op.finish()
File "apache_beam/runners/worker/operations.py", line 506, in apache_beam.runners.worker.operations.DoOperation.finish
def finish(self):
File "apache_beam/runners/worker/operations.py", line 507, in apache_beam.runners.worker.operations.DoOperation.finish
with self.scoped_finish_state:
File "apache_beam/runners/worker/operations.py", line 508, in apache_beam.runners.worker.operations.DoOperation.finish
self.dofn_runner.finish()
File "apache_beam/runners/common.py", line 703, in apache_beam.runners.common.DoFnRunner.finish
self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
File "apache_beam/runners/common.py", line 697, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 722, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 695, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
bundle_method()
File "apache_beam/runners/common.py", line 361, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
def invoke_finish_bundle(self):
File "apache_beam/runners/common.py", line 364, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
self.output_processor.finish_bundle_outputs(
File "apache_beam/runners/common.py", line 832, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 87, in apache_beam.runners.worker.operations.ConsumerSet.receive
self.update_counters_start(windowed_value)
File "apache_beam/runners/worker/operations.py", line 93, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
self.opcounter.update_from(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 195, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
self.do_sample(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 213, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
self.coder_impl.get_estimated_size_and_observables(windowed_value))
File "apache_beam/coders/coder_impl.py", line 953, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
def get_estimated_size_and_observables(self, value, nested=False):
File "apache_beam/coders/coder_impl.py", line 969, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
self._windows_coder.estimate_size(value.windows, nested=True))
File "apache_beam/coders/coder_impl.py", line 758, in apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
self.get_estimated_size_and_observables(value))
File "apache_beam/coders/coder_impl.py", line 772, in apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
self._elem_coder.get_estimated_size_and_observables(
File "apache_beam/coders/coder_impl.py", line 134, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
return self.estimate_size(value, nested), []
File "apache_beam/coders/coder_impl.py", line 458, in apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
typed_value = value
TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'generatedPtransform-1090']
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:280)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:84)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:130)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1233)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:144)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:972)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1096: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 148, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 183, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in do_instruction
request.instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 272, in process_bundle
bundle_processor.process_bundle(instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 494, in process_bundle
op.finish()
File "apache_beam/runners/worker/operations.py", line 506, in apache_beam.runners.worker.operations.DoOperation.finish
def finish(self):
File "apache_beam/runners/worker/operations.py", line 507, in apache_beam.runners.worker.operations.DoOperation.finish
with self.scoped_finish_state:
File "apache_beam/runners/worker/operations.py", line 508, in apache_beam.runners.worker.operations.DoOperation.finish
self.dofn_runner.finish()
File "apache_beam/runners/common.py", line 703, in apache_beam.runners.common.DoFnRunner.finish
self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
File "apache_beam/runners/common.py", line 697, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 722, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 695, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
bundle_method()
File "apache_beam/runners/common.py", line 361, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
def invoke_finish_bundle(self):
File "apache_beam/runners/common.py", line 364, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
self.output_processor.finish_bundle_outputs(
File "apache_beam/runners/common.py", line 832, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 87, in apache_beam.runners.worker.operations.ConsumerSet.receive
self.update_counters_start(windowed_value)
File "apache_beam/runners/worker/operations.py", line 93, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
self.opcounter.update_from(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 195, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
self.do_sample(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 213, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
self.coder_impl.get_estimated_size_and_observables(windowed_value))
File "apache_beam/coders/coder_impl.py", line 953, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
def get_estimated_size_and_observables(self, value, nested=False):
File "apache_beam/coders/coder_impl.py", line 969, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
self._windows_coder.estimate_size(value.windows, nested=True))
File "apache_beam/coders/coder_impl.py", line 758, in apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
self.get_estimated_size_and_observables(value))
File "apache_beam/coders/coder_impl.py", line 772, in apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
self._elem_coder.get_estimated_size_and_observables(
File "apache_beam/coders/coder_impl.py", line 134, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
return self.estimate_size(value, nested), []
File "apache_beam/coders/coder_impl.py", line 458, in apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
typed_value = value
TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'generatedPtransform-1090']
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
每次尝试写入GCS时,都会以非阻塞方式显示上面的异常.
这导致我遇到一种情况,当它尝试输出时,肯定会生成一个新的文本文件,但文本内容始终与第一个窗口输出相同.这显然是不必要的.
异常是如此深深地嵌套在堆栈跟踪中,以至于很难猜测根本原因是什么,我不知道为什么它在DirectRunner上运行得很好,而在DataflowRunner上却运行不佳.
似乎说在管道的某处,尽管我在管道的第二阶段使用了非全局窗口变换,但全局窗口化值已转换为非全局窗口化值.添加自定义触发器没有帮助.
解决方法:
我遇到了同样的错误,找到了解决方法,但没有解决方法:
TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'test-file-out/Write/WriteImpl/WriteBundles']
使用DirectRunner在本地运行,使用DataflowRunner在数据流上运行.
恢复为apache-beam [gcp] == 2.9.0可使我的管道按预期运行.
内容总结
以上是互联网集市为您收集整理的如何在Python中创建从Pub / Sub到GCS的数据流管道全部内容,希望文章能够帮你解决如何在Python中创建从Pub / Sub到GCS的数据流管道所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。