How do I use Google protocol buffers in a multiprocess script?
My use case is:
- pulling data from the new Google Ads API
- appending the objects with metadata
- modelling using the objects
- pushing the results to a database
AdWords Campaign Wrapper Object
I have an existing process for the old AdWords API, where I pull the data and store it in custom classes, e.g.
class Campaign(Represantable): def __init__(self, id, managed_customer_id, base_campaign_id, name, status, serving_status): self.id = id self.managed_customer_id = managed_customer_id self.base_campaign_id = base_campaign_id self.name = name self.status = status self.serving_status = serving_status @classmethod def from_zeep(cls, campaign, managed_customer_id): return cls( campaign.id, managed_customer_id, campaign.baseCampaignId, campaign.name, campaign.status, campaign.servingStatus )
Multiprocessing script
If I want to pull campaigns from a dozen accounts, I can run the scripts that populate the Campaign
objects in parallel using pathos
(again code simplified for this example):
import multiprocessing as mp from pathos.pools import ProcessPool class WithParallelism(object): def __init__(self, parallelism_level): self.parallelism_level = parallelism_level def _parallel_apply(self, fn, collection, **kwargs): pool = ProcessPool( nodes=self.parallelism_level ) # this is to prevent Python from printing large traces when user interrupts execution (e.g. Ctrl+C) def keyboard_interrupt_wrapper_fn(*args_wrapped): try: return fn(*args_wrapped, **kwargs) except KeyboardInterrupt: pass except Exception as err: return err errors = pool.map(keyboard_interrupt_wrapper_fn, collection) return error
Google Ads Campaign Wrapper Object
With the new API, I planned to store the protobuf object within my class, and use pointers to access the objects attributes. My class is a lot more complex than the example, using descriptors and subclass init for the attributes, but for simplicity it's effectively something like this:
class Campaign(Proto): def __init__(self, **kwargs): if "proto" in kwargs: self._proto = kwargs['proto'] if "parent" in kwargs: self._parent = kwargs['parent'] self._init_metadata(**kwargs) @property def id(self): return self._proto.id.value @property def name(self): return self._proto.name.value ...
This has the added advantage of being able to traverse the parent
Google Ads object, to extract data from that protobuf
object.
However, when I run my script of to populate these new objects in parallel, I get a pickle
error. I understand that multiprocess
uses pickle
to serialize the objects, and one of the key advantages of protobuf
objects is that they can be easily serialized.
How should I go about pulling the new Google Ads data in parallel:
- Should I be serializing and deserializing the data in the
Campaign
object using SerializeToString
- Should I just be extracting and storing the scalar data (
id
, name
) like how I did with AdWords - Is there an entirely different approach?