Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I'm currently new to using Apache Beam in Python with Dataflow runner. I'm interested in creating a batch pipeline that publishes to Google Cloud PubSub, I had tinkered with Beam Python APIs and found a solution. However, during my explorations, I encountered some interesting problems which made me curious.

1. The Successful Pipeline

Currently, my successful beam pipeline for publishing data in batch manner from GCS looks like this:

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        from google.cloud import pubsub_v1
        publisher = pubsub_v1.PublisherClient()
        future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    options = PipelineOptions(flags=argv)

    from datapipes.common.dataflow_utils import CsvFileSource
    from datapipes.protos import proto_schemas_pb2
    from google.protobuf.json_format import MessageToJson

    with beam.Pipeline(options=options) as p:
        normalized_data = (
                p |
                "Read CSV from GCS" >> beam.io.Read(CsvFileSource(
                    "gs://bucket/path/to/file.csv")) |
                "Normalize to Proto Schema" >> beam.Map(
                        lambda data: MessageToJson(
                            proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
                            indent=0,
                            preserving_proto_field_name=True)
                    )
        )
        (normalized_data |
            "Write to PubSub" >> beam.ParDo(
                    PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
            )

2. The Unsuccessful Pipelines

Here, I attempted to make the publisher shared accross DoFn. I had attempted the following methods.

a. Initializing publisher in DoFn

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        from google.cloud import pubsub_v1

        batch_settings = pubsub_v1.types.BatchSettings(
             max_bytes=1024,  # One kilobyte
             max_latency=1,  # One second
         )
        self.publisher = pubsub_v1.PublisherClient(batch_settings)
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()

def run_gcs_to_pubsub(argv):
    ... ## same as 1

b. Initializing Publisher outside DoFn, and pass the it to DoFn

class PublishFn(beam.DoFn):
    def __init__(self, publisher, topic_path):
        self.publisher = publisher
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    .... ## same as 1

    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1024,  # One kilobyte
        max_latency=1,  # One second
    )
    publisher = pubsub_v1.PublisherClient(batch_settings)

    with beam.Pipeline(options=options) as p:
        ... # same as 1
        (normalized_data | 
            "Write to PubSub" >> beam.ParDo(
                PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
        )

Both attempts for making the publisher shared across DoFn methods failed with the following error messages:

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__

and

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

My questions would be:

  1. Would the shared publisher implementation improve beam pipeline performance? If yes, then I would like to explore this solution.

  2. Why do the errors occur on my failing pipelines? Is it due to the initializing and passing custom class object to DoFn outside the process function? If it is due to that, how can I implement a pipeline such that I would able to reuse a custom object in DoFn?

Thank you, your help would be greatly appreciated.

Edit: the Solution

Okay, so Ankur has explained why my problem occurs and discussed how serialization is done on DoFn. Based on this knowledge, I now understand that there are two solutions for making custom object shared/reusable in DoFn:

  1. Make the custom object Serializable: this allows the object to be initialized/available during DoFn object creation (under __init__). This object must be serializable since it'll get serialized during pipeline submission in which the DoFn object will be created (which calls __init__). How you can achieve this is answered below in my answer. Also, I found out that this requirement is actually associated to Beam Documentation under [1][2].

  2. Initialize Non-serializable objects in DoFn's functions outside __init__ to avoid serialization since functions outside init aren't called during pipeline submission. How you can accomplish this is explained in Ankur's answer.

References:

[1] https://beam.apache.org/documentation/programming-guide/#core-beam-transforms

[2] https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
838 views
Welcome To Ask or Share your Answers For Others

1 Answer

Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...