Streaming Salesforce Objects into Google BigQuery

My company uses a variety of enterprise-level tools, and one of the most central of them all is Salesforce. Our analytics stack centers around BigQuery, and we use Fivetran, an excellent integration service, to pipe our Salesforce data into BigQuery. Our problem is that Fivetran is a batch ETL, though, so there is a delay (often of hours) before our salespeople can see things reflected in our analytics tool.

Thus, our salespeople (the most important people in the company) don’t use our analytics tool, and just use Salesforce reporting, which means they miss out on a ton of enriched analysis we could be serving them.

So, my mission for the last few weeks has been to stream Salesforce into BigQuery. This is how I did it.

Picking a method

Salesforce is large and complicated, and over time has added a few possible methods for data replication.

The one that seemed most obvious when I started was something called PushTopics, which allow you to define a SOQL query centered around an object you are interested in streaming. If something changes in that object, the PushTopic will push a payload to your waiting CometD client (setting this up involved a whole separate blog post of discovery), which you can then relay to BigQuery.

The problem with PushTopics, for me, were twofold. First, you cannot stream LongText-type fields. We use those, so that’s a problem. Second, there is a limit to the length of your query, and SOQL will not allow Select * statements, so you have to list out all columns. Sure, you can set up multiple PushTopics for a single object and glue the columns together later in Python, but this is kind of insane.

My next best guess was platform messages. Platform messages take you into Apex, Salesforce’s Java-inspired programming language, which is not something I necessarily wanted to do. However, in exchange for this, you get to send pretty arbitrary payloads, and you lose the restriction on query size. All you have to do is set up something called an Apex Trigger, have it focus on the object you want to stream, and have that trigger send a message to the platform message bus, which would hit my eager CometD client.

Sounds pretty good, and once I got it working, I thought this might be our future. I wasn’t happy about having to use CometD, which requires I have some sort of server set up and running at all times, a massive potential point of failure, but life has hardships. I was about to turn my attention downstream when I saw something in the Salesforce Apex Trigger documentation that made me do a double take, and ended up being our ultimate solution.

That solution is callouts. Callouts are arbitrary HTTP requests – bypassing message buses and CometD clients completely. I set up a Google Cloud Function webhook endpoint that would save payloads to Google Cloud Storage and then I started experimenting. I was happy to find that, although I will never quite understand Apex datatypes, the results were fairly simple.

From what I can tell, the only real downside of callouts is a lack of replay ability. This would bother me more if it weren’t for two important factors:

  1. First, I’m using Google PubSub to intake messages, and uptime is virtually 100% (which is much higher than if I were running a linux cloud server myself).
  2. Second, we are keeping our Fivetran batch ETL integration, so I can occasionally combine and de-dupe two parallel versions of the same table (the streaming version and the batch version) in case any messages were missed.

So I overall feel pretty good about escaping arbitrary restrictions placed on us by Salesforce’s other solutions and going with this more bake-it-yourself strategy.

Implementation

It’s nice that there aren’t many moving parts here – the basic recipe is an Apex Trigger (that sets the whole thing in motion when a data change occurs), an Apex “CalloutPubSub” class (to handle callouts to my Google PubSub topic’s endpoint), and a few entries in the Remote Site Settings page.

My prototype object for streaming has been Account, so everything here will center around that, but these lessons should be broadly applicable to other objects.

Apex Trigger

Apex Triggers can be defined in the developer console (click on your name in the top-right of Salesforce and, if you have the right permissions, you should see it in that menu). Once you’re in the developer console, go to File -> New -> Apex Trigger.

This is my first time writing in Apex so I’m sure this is awful, but my trigger is as follows:

trigger Account_message on Account (after insert, after update, before delete) {
 
 String query = 'SELECT Id,IsDeleted,MasterRecordId,Name,Type,RecordTypeId,ParentId,BillingStreet,BillingCity,BillingState,BillingPostalCode,BillingCountry,BillingStateCode,BillingCountryCode,BillingLatitude,BillingLongitude,BillingGeocodeAccuracy,ShippingStreet,ShippingCity,ShippingState,ShippingPostalCode,ShippingCountry,ShippingStateCode,ShippingCountryCode,ShippingLatitude,ShippingLongitude,ShippingGeocodeAccuracy,Phone,Fax,AccountNumber,Website,PhotoUrl,Sic,Industry,AnnualRevenue,NumberOfEmployees,Ownership,TickerSymbol,Description,Rating,Site,CurrencyIsoCode,OwnerId,CreatedDate,CreatedById,LastModifiedDate,LastModifiedById,SystemModstamp,LastActivityDate,LastViewedDate,LastReferencedDate,IsPartner,Jigsaw,JigsawCompanyId,AccountSource,SicDesc,AVA_SFCORE__Billing_Last_Validated__c,AVA_SFCORE__ExemptEntityType__c,AVA_SFCORE__Shipping_Last_Validated__c,Bubble_Up_By__c,Bubble_Up_Date__c,Business_Description__c,Completed_Activity_Count__c,Customer_ID__c,Do_Not_Sell_Reason__c,Do_Not_Sell__c,Escalated_By__c,Facebook__c,Google__c,Last_Activity_Date__c,Last_Activity_Notes__c,Last_Activity_Type__c,Last_Lead_Source_Name__c,Lead_Dev_Locked__c,Lead_Source_Name__c,Lead_Source__c,LinkedIn__c,Marketo_DB_URL__c,Marketo_ID__c,Most_Recent_Opportunity__c,Most_Recent_Profile__c,NetSuite_Customer_ID__c,NetSuite_Partner_ID__c,New_Lead_Date__c,Owner_Priority__c,Partner_Affiliate_Code__c,Partner_Affiliate_Password__c,Partner_Code__c,Partner_Commission_Percent__c,Partner_ID__c,Partner_Lead__c,Partner_Program__c,Partner_Progression_Status__c,Partner_Sales_Owner__c,Partner_Sales_Territory__c,Partner_Start_Date__c,Partner__c,Pinterest__c,Primary_Contact__c,Primary_IS_App__c,Qualification_Activity_Count__c,Qualification_Status__c,Reason__c,Sales_Rep__c,Skype__c,Twitter__c,Unique_Key_Code__c,You_Tube__c,ByPass_Edit_Validation__c,Customer_Referral_Date_From_MA__c,Lead_Reason__c,Make_Callout_Insert__c,Make_Callout_Update__c,NetSuite_Prospect_ID__c,Partner_Affiliate_Code_From_MA__c,Partner_Last_Referred_Date__c,Partner_Referred_Date_from_MA__c,Referral_Type__c,Referring_Customer_App_Name__c,Referring_Customer_Email__c,Referring_Customer_Name__c,Surpress_Callout__c,SyncToNetSuite__c,TriggerAccountUpdate__c,Partner_Business_Type__c,Referral_Notes__c,Purchased_Lead_Notes__c,Marketing_Qualified__c,Most_Recent_Referral_for_Owner__c,Default_Partner_Campaign__c,Parent_Primary_Contact__c,Partner_Type__c,Contract_App_name__c,Contract_App_status__c,Date_customer_signed_up__c,Edition_of_Infusionsoft_app__c,Most_Recent_Asset__c,Partner_Recruiter__c,Partner_Sales_YTD__c,NAICS_Code__c,Campaign__c,Partner_from_MA__c,Tracking_Link__c,Event_Pricing__c,Name_of_Event__c,Speaker__c,Time_Zone__c,Total_Assets__c,AVA_SFCORE__Billing_County__c,AVA_SFCORE__Shipping_County__c,AdvocateHub__Referral_Source__c,Referral_ID__c,Annual_Revenue_bp__c,Business_Description_bp__c,Business_Type_bp__c,Channels_Points_of_Sale_bp__c,How_was_list_built_bp__c,Industry_bp__c,Key_Products_Services_bp__c,Marketing_Automation_System_bp__c,Marketing_List_Size_bp__c,Monthly_Marketing_Budget_bp__c,Stage_Number_Value_bp__c,Sub_Industry_bp__c,Who_Manages_Website_bp__c,Years_in_Business_bp__c,of_Employees_bp__c,CRM_System__c,Challenges_Needs__c,Ecommerce_System__c,Recalculate_Sharing__c,Link_Posted_By__c,MBR_Has_Active_Contract__c,MBR_Is_Partner_Customer__c,MBR_Most_Recent_App_Edition__c,MBR_Most_Recent_Contract_Start__c,MBR_Most_Recent_Kickstart_Completion__c,MBR_Not_A_Free_App__c,Partners_Success_Coach__c,Value_Added_Reseller_Track_Level__c,Service_Track_Level__c,App_Developer_Track_Level__c,Original_App__c,Aggregate_CHS__c,Partner_Service_CHS__c,Partners1_ID__c,partner_overview_record__c,NAICS_Name__c,SIC_Code__c,SIC_Name__c,UniqueEntry__Account_Dupes_Ignored__c,rrpu__Alert_Message__c,Datanyze_Address__c,Datanyze_Alexa_Rank__c,Datanyze_City__c,Datanyze_Country__c,Datanyze_Description__c,Datanyze_Employees__c,Datanyze_Funding__c,Datanyze_Industry__c,Datanyze_Monthly_Tech_Spend__c,Datanyze_Phone__c,Datanyze_Public_Text__c,Datanyze_Revenue_Text__c,Datanyze_State__c,Datanyze_Tags__c,Datanyze_Twitter__c,Datanyze_Year_Founded__c,Datanyze_Zip__c,Datanyze_Technologies__c,Datanyze_Technology_History__c,Datanyze_Predict_Score__c,Datanyze_Predict_Rating__c,Customer_Success_Guide__c,Datanyze_Sync__c,Wepay_Discussed__c,Partner_Serviced_Customer__c,Service_Partner__c FROM Account where id in (';
 String endpoint = 'https://pubsub.googleapis.com/v1/projects/infusionsoft-looker-poc/topics/salesforce_all:publish?topic=projects%2Finfusionsoft-looker-poc%2Ftopics%2Fsalesforce_all&access_token=';
 
 Map<String, Object> data_list = new Map<String, Object>();
 Map<String, String> metadata = new Map<String, String>();
 
 if (Trigger.isDelete) {
 
 List<Account> list_of_changes = Trigger.old;
 List<String> list_of_changed_ids = new List<String>();
 
 for (Account o : list_of_changes) {
 list_of_changed_ids.add('\'' + String.valueOf(o.get('Id')) + '\'');
 }
 
 query = query + String.join(list_of_changed_ids, ', ') + ')';
 
 List<sObject> new_data_list = Database.query(query);
 
 metadata.put('Type', 'Delete');
 
 data_list.put('metadata', metadata);
 data_list.put('payload', new_data_list);
 
 CalloutPubSubClass.makeCallout(JSON.serialize(data_list), endpoint);
 
 } else {
 // Create an instance of the event and store it in the payloadUpdate variable
 
 List<Account> list_of_changes = Trigger.new;
 List<String> list_of_changed_ids = new List<String>();
 
 for (Account o : list_of_changes) {
 list_of_changed_ids.add('\'' + String.valueOf(o.get('Id')) + '\'');
 }
 
 query = query + String.join(list_of_changed_ids, ', ') + ')';
 
 List<sObject> new_data_list = Database.query(query);
 
 metadata.put('Type', 'Update');
 
 data_list.put('metadata', metadata);
 data_list.put('payload', new_data_list);
 
 CalloutPubSubClass.makeCallout(JSON.serialize(data_list), endpoint);
 }
}

The script here is split into delete and update/insert actions, with update and insert being treated the same. Notice in the trigger declaration I use after insert and after update but I use before delete – this means that I want to run this trigger after insertions or updates (which makes sense – want the new stuff) but before a deletion occurs. This doesn’t matter terribly much since the Trigger object has Trigger.old and Trigger.new attributes, but I like to keep everything aligned regardless.

Another thing to notice is my extremely specific query. This came from Fivetran – in our salesforce BigQuery dataset where Fivetran loads data, there is a salesforce.fivetran_query table that lists the specific query that Fivetran runs when getting data from the Salesforce API. My job is to replicate Salesforce’s work, so I’m running the exact same query, and then using a loop and string concatenation to tack on IDs that have changed in the WHERE clause of the query.

Beyond that, things are fairly simple. I gather all of my changed IDs, add them to my query, run said query, and then take that data and put it into a payload. I add some metadata so that I know downstream if this is a delete or update/insert (which I just label as an update) and then I send it to the CalloutPubSubClass, which I explain next.

Apex CalloutPubSub Class

I edit my Apex Classes outside of Developer Console, for some reason – I think some guide sent me this direction? Anyways, in Salesforce I click on Setup, and then type in the Quick Find box “apex class” and it comes up. But yeah, you can also do this in Developer Console.

My garbage can of Apex code:

public class CalloutPubSubClass {
    public static String getAccessToken() {
        String url = 'https://www.googleapis.com/oauth2/v4/token';
        String client_id =<REDACTED>;
        String client_secret = <REDACTED>;
        String refresh_token =<REDACTED>;
        String body = 'client_id=' + client_id + '&client_secret=' + client_secret + '&refresh_token=' + refresh_token + '&grant_type=refresh_token';
        
        HttpRequest req = new HttpRequest();
        req.setEndpoint(url);
        req.setMethod('POST');
        req.setbody(body);
        Http http = new Http();
        HTTPResponse response = http.send(req);
        
        String token_response = response.getBody();
        
        Map<String, String> token_json = (Map<String, String>)JSON.deserialize(token_response, Map<String,String>.class);
        String access_token = token_json.get('access_token');
        
        return access_token;
    }

    @future(callout=true)
    public static void makeCallout(String payload, String endpoint_raw) {
        String access_token = getAccessToken();
        
        Blob myBlob = Blob.valueof(payload);
        String payload64 = EncodingUtil.base64Encode(myBlob);
        
        Map<String, Object> payload_json = new Map<String, Object>();
        Set<Object> messages = new Set<Object>();
        Map<String, String> data = new Map<String, String>();
        
        data.put('data', payload64);
        messages.add(data);
        payload_json.put('messages', messages);
    
        String endpoint = endpoint_raw + access_token;
        HttpRequest request = new HttpRequest();
        request.setEndPoint(endpoint);
        request.setMethod('POST');
        request.setHeader('Content-Type', 'application/json;charset=UTF-8');
        request.setTimeout(10000);
        request.setBody(JSON.serialize(payload_json));
        if (!Test.isRunningTest()) {
            HttpResponse response = new HTTP().send(request);
        }
    }
}

Much of what goes on here is specific to dealing with Google PubSub – for instance, the whole getAccessToken() method is part of that. If you’re trying to do that, then this code block will be exceptionally useful, but if you’re not then just ignore it.

One line, @future(callout=true), is exceptionally important – I think? I think it makes this function asynchronous, which is great for situations in which a bulk change occurs and the trigger chunks it into 50 different payloads. A nice man on the internet told me to put it there so I did.

Also note the base64 encoding step – this is not necessary for most people but will be necessary for sending payloads to Google PubSub. It took me approximately fifty tries to get this right, since Google somehow turns the biggest collection of engineering talent on earth into the worst collection of engineering documentation on earth, but whatever. I wouldn’t hold a grudge.

None of this will work if you implement, though, because there is one more step that I forgot every time I had to change endpoints.

Remote Site Settings

Salesforce tries to maintain a pretty secure environment, and part of that is not letting Apex code within Salesforce make random callouts with who-knows-what kind of data leaving the site. So, you need to whitelist your endpoint, and also (if applicable) whitelist the endpoint for obtaining your access token. Just go to Setup and type “remote site” into the QuickFind box and you’ll see the page you need. I left the Disable Protocol Security box unchecked because I would reliably be using HTTPS.

Further down the line

The majority point of writing this is to have something to show our Salesforce Admins to convince them to take my code out of sandbox and put it into production, so what happens downstream isn’t a big concern. I’ll give a broad sketch and save the details for another post.

Data is chunked into 200 records at a time by the trigger, then put through our specific query, and then sent to PubSub. Thankfully, Salesforce does not include nulls in their query results, so the payload sizes are very manageable and easy to process.

My PubSub topic’s only subscriber right now is a Google Cloud Function, which takes the payload and determines if it is a delete or update object. If it is delete, then it changes last_modified to the current UTC time and changes is_deleted to True, then streams it into BigQuery. If it is an update then it can skip those transforms and go straight to BQ. A copy of the original payload is also sent to Google Cloud Storage, which I may turn off once everything is stable.

And that’s it! I can’t believe that took me two weeks, but at least it’s done now. I’ll probably be making posts about CometD and my Google Cloud Project pipeline for this soon.