Transform Pub/Sub to Firestore Database in GCP

Rusty Divine Apache, Cloud, Data Science, Python Leave a Comment

This year, the client I work with has started exploring the offerings in Google Cloud Platform (GCP) after investing years into the Microsoft Azure cloud. The opportunity has allowed me to explore a few new technologies that this post will cover and that you will hopefully find interesting.

In short, I was tasked with transforming Pub/Sub to Firestore Database in GCP. The scenario explored in this post takes an FHIR healthcare data feed, extracts telephone and email information, and then stores that in a Firestore database.

Google Dataflow with Apache Beam is a great solution for processing very large files, database tables, or streams of data in order to transform or summarize the data. You could imagine solutions for video game leaderboards, Internet of Things (IoT) analytics, overnight batch processes, high throughput event handling, and more.

The offerings in GCP are extensive, and this post will focus on a few of these technologies:

  • Pub/Sub: a messaging service we’ll use to simulate subscribing to an FHIR-formatted feed
  • Dataflow: a streaming or batch data processing service we’ll use to transform the FHIR messages to another format
    • Apache Beam: the framework we’ll use to create a data processing pipeline
  • Firestore: a NoSQL database where we’ll store the transformed messages

Architecture & Setup

transform pub/sub to firestore database in GCP

The design for this example scenario is fairly straightforward. The idea is to take messages that adhere to the FHIR format that was published from a Healthcare API, extract some communication information, and save that to a Firestore database.

If you would like to take it further, then I recommend reading this 4-part series of posts that create an FHIR API in GCP that could be the basis of the Pub/Sub that this post starts with.

If you would like to try the code, too, then you will need to create a Firestore database. If you log into GCP, create or select a project, and search for Firestore, you will be led through the setup process. You can run the Python/Apache Beam code locally, but if you want to deploy it to Dataflow, then you will also need to enable Dataflow and configure permissions for yourself or a service account.

The code for this demonstration can be downloaded from this repository.

You can follow these steps in the Readme file to get your environment configured if you would like to follow along.

Read Pub/Sub Messages with Apache Beam

Full demonstration code can be reviewed in this repository

This is the code the demonstration will begin with in order to read messages, log something about the message, and split the incoming feed by payload type:

    with Pipeline(options=pipeline_options) as pipeline:
        sub = pipeline_options.input_sub

        NameOnly, FullResource, UnknownPayload = (
            pipeline
            | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=sub, with_attributes=True)
            | "Log the message" >> beam.ParDo(LogIncoming())
            | "Partition by payloadType" >> beam.Partition(partition_payload_type, len(payloadTypes))
        )

This snippet creates a pipeline in line 1 and then gets the subscription name from the arguments on line 2.

Line 4 breaks out the values that are returned from the following block of code, specifically from the last step that partitions by payload type. Line 6 starts reading from the Pub/Sub, and then line 7 logs each message.

The way Apache Beam works is that each step in the pipeline is separated by the pipe character (“|”). Each step takes an immutable collection of elements, processes the entire collection in a parallel/distributed manner, and outputs another immutable collection for the next step.

This post is working with what’s called an unbounded collection, meaning elements keep coming in all the time to process. A bounded collection would be like a file or database table where there is a set number of elements to process.

For an unbounded collection, you can use what’s called Windowing, which can be based on time or other factors. Imagine you set up a window for 30 seconds and receive 100 messages in that time. That window is then processed as one bounded group, and you can perform grouping for deduplication or analytics for averages and counts.

This post does not get into windowing and grouping, but you can read more on Apache’s website.

Line 7 logs the element and returns it unaffected, and then line 8 splits the collection into three separate collections by payload type: NameOnly, FullResource, and UnknownPayload.

payloadTypes = ['NameOnly','FullResource', 'UnknownPayload']
def partition_payload_type(payload, len):
    if payload is None:
        return payloadTypes.index('UnknownPayload')
    
    try:
        payloadType = payload.attributes.get("payloadType")
        if payloadType in payloadTypes:
            return payloadTypes.index(payloadType)
        else:
            return payloadTypes.index('UnknownPayload')
    except Exception as e:
        logging.exception(f'partition_payload_type: {str(e)}!')
        return payloadTypes.index('UnknownPayload')

Processing Payload Types

Now, we’re one step closer to transforming pub/sub to firestore database in GCP. We have three different collections corresponding to our payload types (NameOnly, FullResource, and UnknownPayload). We’re expecting the NameOnly collection to have items when the Healthcare API can’t provide the full resource due to load conditions.

We would need to look these up individually and then combine those results with the FullResource collection in the future. For now, we will just log information about the NameOnly and UnknownPayload types:

        nameOnlyLookup = (
                # Look these up by id in the Healthcare API
                NameOnly | 'TODO: lookup name only payload types' >>  beam.Map(lambda x: logging.debug(f'NameOnlyLookup {x}'))
        )

        UnknownPayloadLog =  (
            UnknownPayload 
            | 'Log unrecognized payload types' >> beam.Map(lambda x: logging.warn(f'Unrecognized payload type {x}'))
        ) 

The FullResource payload type supplies all the information we need to extract the contact information for our patient.

        ContactInfoProcess = (
            FullResource | 'Filter by Person' >> beam.Filter(lambda x: partition_resource_type(x, len(resourceTypes)) == resourceTypes.index('Person'))
                         | 'Extract contact info' >> beam.ParDo(ExtractContactInfo())
                         | 'Save to Firestore' >> beam.ParDo(Save(), pipeline_options)
        )   

Line 2 filters out any elements of the incoming collection that are not of resource type “Person.” Line 3 then takes that filtered list and extracts the contact information that we will save to Firestore in line 4.

Notice on line 4, we pass in some extra parameters, but they don’t go into the arguments for the Save() method, they instead go into the arguments of the ParDo function.

class ExtractContactInfo(DoFn):
    def process(self, element):
        """Extracts contact info from a person or patient resource
        """
        contactInfo = {}
        contactInfo['id'] = 'Unknown'
        msg = json.loads(element.data.decode("utf-8"))

        try:
            # Identifiers (https://www.hl7.org/fhir/patient-definitions.html#Patient.identifier)
            if 'identifier' in msg:
                # Just for this example, using the first identifier as the id
                contactInfo['id'] = msg['identifier'][0]['value']
                
                identifiers = {}
                for identifier in msg['identifier']:
                    system = identifier['system']
                    value = identifier['value']
                    identifiers[system] = value
                contactInfo['identifiers'] = identifiers

            # Name (https://www.hl7.org/fhir/patient-definitions.html#Patient.name)
            if 'name' in msg:
                for name in msg['name']:
                    use = name['use']
                    if use == 'official':
                        contactInfo['legalLastName'] = name['family']
                        if 'given' in name:
                            contactInfo['legalFirstName'] = name['given'][0]
                            if (len(name['given']) > 1):
                                contactInfo['legalMiddleName'] = name['given'][1]

            # Email (https://www.hl7.org/fhir/patient-definitions.html#Patient.telecom)
            if 'telecom' in msg:
                for telecom in msg['telecom']:
                    if telecom['system'] == 'email':
                        contactInfo['email'] = telecom['value']
                        break

            # Phones (https://www.hl7.org/fhir/patient-definitions.html#Patient.telecom)
            if 'telecom' in msg:
                for telecom in msg['telecom']:
                    if telecom['system'] == 'phone':
                        use = telecom['use']
                        if use == 'home':
                            contactInfo['phoneHome'] = telecom['value']
                        elif use == 'mobile':
                            contactInfo['phoneMobile'] = telecom['value']
                        elif use == 'work':
                            contactInfo['phoneWork'] = telecom['value']
            
            
        except Exception as e:
            logging.exception(f'ExtractContactInfo: {str(e)}')
        
        yield contactInfo

Now, save to Firestore:

class Save(DoFn):
    def process(self, element, custom_options: CustomOptions):
        """Saves contact to Firestore
        """
        try:
            project = custom_options.get_all_options()['project']
            collection = 'ContactInfo'
            id = element['id']

            db = firestore.Client(project)
            # Upsert the contact info - merge=True to maintain older fields
            db.collection(collection).document(id).set(element, merge=True)

            logging.info(f'Updated contact info for {id}')

        except Exception as e:
            logging.exception(f'Save: {str(e)}')

        yield element

Running the Code

To run the code locally, you can run the debug from the debug menu after configuring your launch.json file (see readme).

To upload the job to Dataflow in GCP, run the following command:

python main.py \
--project YOUR_PROJECT_NAME \
--job_name YOUR_JOB_NAME \
--runner DataflowRunner \
--region us-east1 \ 
--temp_location YOUR_BUCKET_LOCATION \
--input_sub YOUR_INPUT_SUBSCRIPTION \
--setup_file .\setup.py \
--service_account_email=YOUR_SERVICE_ACCOUNT_IF_USING

You should see a job created in Dataflow that looks like:

Notice that each box in the job corresponds to one of the piped tasks in the code. The processing for each of those boxes can be spread out amongst many servers and GCP takes care of managing how many servers to allocate to get the job done efficiently.

The right panel will tell you information about the metrics it is consuming in order to complete the job. It will keep running even after you stop your local process, so make sure to also stop the job in GCP Dataflow.

You should see elements appear in your Firestore database like:

pub/sub to firestore database in GCP

Wrap Up

You can process data efficiently even when you have very large datasets by utilizing Google Dataflow with Apache Beam to distribute each processing step in parallel.

This introductory demonstration showed how to transform Pub/Sub to Firestore Database in GCP. We showed you one example of processing data from a Healthcare API in FHIR format to show just one scenario where you could implement a data processing pipeline.

The sky’s the limit though on processing bounded or unbounded data in GCP. Please leave your comments on how you are using Dataflow and feel free to ask any questions you have!

If you are interested in learning more, check out the Keyhole Dev Blog or these resources:

0 0 votes
Article Rating
Subscribe
Notify of
guest

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments