How do I use Google protocol buffers in a multiprocess script?

My use case is:

  • pulling data from the new Google Ads API
  • appending the objects with metadata
  • modelling using the objects
  • pushing the results to a database

AdWords Campaign Wrapper Object

I have an existing process for the old AdWords API, where I pull the data and store it in custom classes, e.g.

class Campaign(Represantable):     def __init__(self, id, managed_customer_id, base_campaign_id, name, status, serving_status):         self.id = id         self.managed_customer_id = managed_customer_id         self.base_campaign_id = base_campaign_id         self.name = name         self.status = status         self.serving_status = serving_status @classmethod     def from_zeep(cls, campaign, managed_customer_id):         return cls(             campaign.id,             managed_customer_id,             campaign.baseCampaignId,             campaign.name,             campaign.status,             campaign.servingStatus         ) 

Multiprocessing script

If I want to pull campaigns from a dozen accounts, I can run the scripts that populate the Campaign objects in parallel using pathos (again code simplified for this example):

import multiprocessing as mp from pathos.pools import ProcessPool class WithParallelism(object):     def __init__(self, parallelism_level):         self.parallelism_level = parallelism_level     def _parallel_apply(self, fn, collection, **kwargs):         pool = ProcessPool(             nodes=self.parallelism_level         )                  # this is to prevent Python from printing large traces when user interrupts execution (e.g. Ctrl+C)         def keyboard_interrupt_wrapper_fn(*args_wrapped):             try:                 return fn(*args_wrapped, **kwargs)             except KeyboardInterrupt:                 pass             except Exception as err:                 return err         errors = pool.map(keyboard_interrupt_wrapper_fn, collection)         return error 

Google Ads Campaign Wrapper Object

With the new API, I planned to store the protobuf object within my class, and use pointers to access the objects attributes. My class is a lot more complex than the example, using descriptors and subclass init for the attributes, but for simplicity it's effectively something like this:

class Campaign(Proto):     def __init__(self, **kwargs):         if "proto" in kwargs:             self._proto = kwargs['proto']         if "parent" in kwargs:             self._parent = kwargs['parent']         self._init_metadata(**kwargs) @property     def id(self):         return self._proto.id.value @property     def name(self):         return self._proto.name.value    ... 

This has the added advantage of being able to traverse the parent Google Ads object, to extract data from that protobuf object.

However, when I run my script of to populate these new objects in parallel, I get a pickle error. I understand that multiprocess uses pickle to serialize the objects, and one of the key advantages of protobuf objects is that they can be easily serialized.

How should I go about pulling the new Google Ads data in parallel:

  • Should I be serializing and deserializing the data in the Campaign object using SerializeToString
  • Should I just be extracting and storing the scalar data (id, name) like how I did with AdWords
  • Is there an entirely different approach?

Tag:google-ads-api, python, protocol-buffers, multithreading

Add a new comment.