Welcome to Part I of our Transformation series. In this series, we will show you how to use the Meroxa Platform in conjunction with the Turbine Framework to transform, enrich, orchestrate, and analyze data in real-time.
Throughtout the series, we will use a PostgreSQL database with a table called "customers" that has information about our customers and the orders they have made. We will be doing many transformations to enrich this data set in real-time to understand better where our customers are from, engage with them, and visualize our data.
In part I, we will do address enrichment by leveraging the Google Maps API to enrich and validate existing customer address data. Street address locations may have typos, spelling variations, misspellings, and other errors. Google Maps is one of the best sources of location-based addresses; hence, it was chosen as a source of data enhancement and enrichment. Later, we will use this data to plot demographic insights on our customers for business analytics.
What is Meroxa?
Meroxa is a Stream Processing Application Platform as a Service (SAPaaS) where developers can run Turbine applications. Turbine is Meroxa's stream processing application framework for building event-driven stream processing apps that respond to data in real-time and scales using cloud-native best practices. Meroxa handles the underlying streaming infrastructure so developers can focus on building the applications. Turbine applications start with an upstream resource. Once that upstream resource is connected, Meroxa will take care of streaming the data into the Turbine application so it can be run. Since Meroxa is a developer first platform, engineers can ingest, orchestrate, transform, and stream data to and from anywhere using languages they already know, such as Go, JavaScript, Python, or Ruby. Support for Java, and C# is also on the way.
💡 Meroxa has support for many source and destination resources. You can see which resources are supported here. If there's a resource not listed you can request it by joining our community or by writing to support@meroxa.com. Meroxa is capable of supporting any data resource as a connector.
Overview
To get started with enriching and collecting metadata on our customers’ addresses, we will be leveraging the Google Maps geocoding API. If you are unfamiliar with this API, you can check out the Google Maps documentation here. We will use the search API to send in customer address information, which will return a more comprehensive object on the address, such as latitude and longitude.
At a high level, Meroxa will detect changes in your PostgreSQL database via Change Data Capture (CDC). Each record from PostgreSQL will be streamed over to our Turbine application in real-time. In our case, it will take the address and enrich it via the Google Maps API. Once the record has been processed, it will be written to Snowflake.
Take Me To The Code!
To start, you will need the following:
Once you have signed up for Meroxa and set up the Meroxa CLI you can follow the following steps to get up and running:
💡 Here we are creating the resources via the CLI, you can also do so via the Meroxa Dashboard once you are logged in.
-
Adding your PostgreSQL and Snowflake Resources
PostgreSQL (Guide on configuring PostgreSQL) - Source Resource
Below we are creating a PostgreSQL connection to Meroxa named
pg_db
.Note: To support CDC (Change Data Capture) we turn on the
logical_replication
flag.$ meroxa resource create pg_db \\\\ --type postgres \\\\ --url postgres://$PG_USER:$PG_PASS@$PG_URL:$PG_PORT/$PG_DB \\\\ --metadata '{"logical_replication":"true"}'
Snowflake (Guide on setting up Snowflake) - Destination Resource
Below we are creating a Snowflake connection named
snowflake
.$ meroxa resource create snowflake \\\\ --type snowflakedb \\\\ --url "snowflake://$SNOWFLAKE_URL/meroxa_db/stream_data" \\\\ --username meroxa_user \\\\ --password $SNOWFLAKE_PRIVATE_KEY
-
Initializing Turbine
$ meroxa apps init part-one-google-maps-enrichment --lang js
-
Writing your Turbine code
Open up your
part-one-google-maps-enrichment
folder in your preferred IDE. You will get boilerplate code that explains where to code your sources and destinations named in Step 1. In our case we just need to do the following to set the connection between PostgreSQL and Snowflake:async run(turbine) { // First, identify your PostgreSQL source name as configured in Step 1 // In our case we named it pg_db let source = await turbine.resources("pg_db"); // Second, specify the table you want to read in your PostgreSQL DB let records = await source.records("customers"); // Optional, Process each record that comes in! let transformed = await turbine.process(records, this.transform); // Third, identify your Snwoflake source name configured in Step 1 let destination = await turbine.resources("snowflake"); // Finally, specify which table to write that data to await destination.write(transformed, "customer_addresses_enriched"); }
await turbine.tranform
allows developers to write a function that will be run on each record. We will preprocess our data before sending it to Snowflake. Below we have ourtransform
function, which loops through each record coming in from the data stream. We are calling the Google Maps API on the address field of every record and generating an address object that contains metadata on the address. Later we write that metadata in a new table to Snowflake.💡 You can view the complete repository for this data app on Github here.
async transform(records) { for (const record of records) { const customer_address = record.get("customer_address") console.log("[DEBUG] customer_address ===> ", customer_address) if (!customer_address || customer_address.length === 0) { console.log("[ERR] customer_address ===> ", customer_address) return } const googleMapsLookupResponse = await googleMapsLookup(customer_address) console.log("[DEBUG] googleMapsLookupResponse ===> ", JSON.stringify(googleMapsLookupResponse)) if (!googleMapsLookupResponse) { console.log("[ERR] googleMapsLookupResponse ===> ", JSON.stringify(googleMapsLookupResponse)) return } const address_metadata = generateAddressObject(googleMapsLookupResponse) console.log("[DEBUG] address_metadata ===> ", address_metadata) record.set("address_metadata", address_metadata) for(var key in address_metadata) { record.set(key, address_metadata[key]) } } records.unwrap(); return records; }
-
Deploying Your App
Commit your changes
$ git add . $ git commit -m "Initial Commit"
Deploy your app
$ meroxa apps deploy
Once your app is deployed, you will see your Snowflake DB populate with all the enriched data from the PostgreSQL table. You can also insert a record into your table to see it stream over to Snowflake in real-time!
Meroxa sets up all the connections and remove the complexities, so you, the developer, can focus on the important stuff.
What's Next
In our next blogpost we will look at how to use Meroxa with the Twillio API & Telnyx API to transform telephony data and trigger SMS events to new customers in our database. We will do phone number enrichment to validate which customers in our database have registered with a mobile phone number that is capable of receiving SMS messages and later we will trigger SMS messages to those valid numbers. Stay tuned!
Have questions or feedback?
If you have questions or feedback, reach out directly by joining our community or by writing to support@meroxa.com.
Happy Coding 🚀