Video Tutorial (1 minute)
Github Repo: meroxa/turbine-examples/javascript/user-demo/
💡 To see how to move data out of Mongo to any data destination, check out our blog post here: https://meroxa.com/blog/streaming-changes-in-real-time-from-mongodb-to-apache-kafka
This blog covers using MongoDB as a downstream source. We will be moving data in real-time from PostgreSQL to MongoDB. Meroxa will keep track of any changes in your PostgreSQL database and post those CREATE, UPDATE or DELETE operations in MongoDB, keeping both in Sync.
Migrating data from PostgreSQL to MongoDB or vice versa can be a time-consuming process. With Meroxa you can do this in just a few lines of code. In this blog post we will be keeping our PostgreSQL database in sync with our MongoDB Atlas instance. In addition, we will briefly go over how you can transform the data going into MongoDB in real-time.
While this post covers getting data into Mongo, we can also pull data out of Mongo to any data destination by doing the opposite of what's covered in this post (Here’s a blog post on moving data from MongoDB to Apache Kafka in real-time).
What is Meroxa?
Meroxa is a streaming application platform where developers can run their Turbine applications. Meroxa handles the underlying streaming infrastructure so that developers can focus on building their applications. Turbine applications start with an upstream resource. Once that upstream resource is connected, Meroxa will handle streaming the data into the Turbine application for execution.
What is Turbine?
Turbine is a stream processing application framework for building event-driven data apps that respond to data in real-time and scale using cloud-native best practices. No bespoke domain-specific language (DSL).
You can even see how your app reacts to data by running your Turbine data applications locally—we show you exactly what will happen in Production, with faster iteration and development without having to deploy.
You can write your Turbine data apps using Go, Javascript, Python, or Ruby.
💡 If you prefer to use another language, Meroxa has support for many more languages coming, reach out directly to suggest a language by joining our community or by writing to support@meroxa.com
How it works
In this example, the Turbine app will create a CDC (Change Data Capture) connector from the platform to a PostgreSQL database (can be any database) and then writes that data to MongoDB Atlas.
Here's what happens and what we can do to stream and transform our data:
- The PostgreSQL connector receives changes in real-time and publishes them in the form of a stream.
- Inside our Turbine app we can write functions to transform and manipulate that data. We can do anything we would generally do with any programming language such as calling APIs or importing packages and libraries and change that data.
- The Meroxa Platform then streams that data to MongoDB in real-time, without you, the developer having to worry about scalability, flexibility or schemas.
Requirements
- Meroxa account
- Meroxa CLI
- Meroxa supported PostgreSQL DB
- MongoDB Instance
- Node JS (In this tutorial we will be using the Turbine Javascript Framework)
Setup
Once you have signed up for Meroxa and set up the Meroxa CLI you can follow the following 4 steps to get up and running:
💡 Here we are creating the resources via the CLI, you can also do so via the Meroxa Dashboard once you are logged in.
-
Adding your PostgreSQL and MongoDB Atlas Resources
PostgreSQL (Guide on configuring your PostgreSQL) - Source Resource
Below we are creating a PostgreSQL connection to Meroxa named
pg_db
.Note: To support CDC (Change Data Capture) we turn on the
logical_replication
flag.$ meroxa resource create pg_db \\\\ --type postgres \\\\ --url postgres://$PG_USER:$PG_PASS@$PG_URL:$PG_PORT/$PG_DB \\\\ --metadata '{"logical_replication":"true"}'
MongoDB Atlas (Guide on setting up Mongo Db Atlas) - Destination Resource
Below we are creating a MongoDB Atlas connection named
mdb
.$ meroxa resource create mdb \\ --type mongodb \\ --url "mongodb+srv://$MONGO_USER:$MONGO_PASS@$MONGO_URL/$MONGO_DATABASE_NAME"
-
Initializing our Turbine app
$ meroxa apps init postgres-to-mongo --lang js
This will create a directory called
postgres-to-mongo
with some boilerplate code to get you started. -
Coding our Turbine app
Open up your
postgres-to-mongo
folder in your preferred IDE. Let’s code our upstream and downstream resources that we defined in step 1 above.async run(turbine) { // First, identify your PostgreSQL source name as configured in Step 1 // In our case we named it pg_db let source = await turbine.resources("pg_db"); // Second, specify the table you want to access in your PostgreSQL DB let records = await source.records("User"); // Third, Process each record that comes in! let processed = await turbine.process(records, this.processData); // Fourth, identify your MongoDB destination resource configured in Step 1 let destination = await turbine.resources("mdb"); // Finally, specify which "collection" in mongo to write to. If none exists, it will be created await destination.write(processed, "user_copy"); }
In our
processData
function we will just be logging the time when the record was processed. However, in this function you can do anything to transform your records, such as calling an API, manipulating data, enriching data etc. In the code below we have some examples in the comments.processData(records) { for (const record of records) { const dateTimeGmt = new Date().toGMTString() console.log(`[DEBUG] Streaming Record To Destination: ${dateTimeGmt}`) // Encrypt data using a 3rd party library or package record.set( 'secretcode', sha256(record.get('secretcode')) ); // Format Data via a custom function record.set('phone_number', formatPhone(record.get('phone_number'))) // Enrich Data via an API const addressLookupResults = await googleMapsLookup(record.get('address')) const addressMetaData = generateAddressObject(addressLookupResults) record.set('address_metadata', addressMetaData); } records.unwrap(); return records; }
💡 For a more detailed example on using API’s & doing transformations in Turbine you can read our blog post here.
-
Deploying Your Application
Commit your changes
$ git add . $ git commit -m "Initial Commit"
Deploy your app
$ meroxa apps deploy
💡 To visualize your deployed application, you can check out an overview of our Turbine visualizations here.
Once your app is deployed you will see the PostgreSQL data populate in the
user_copy
collection in MongoDB Atlas. As records or changes come into your data source (PostgreSQL in this example), your Turbine app running on the Meroxa platform will process each record in real-time!Meroxa will set up all the connections and remove the complexities, so you, the developer, can focus on the important stuff.
Have questions or feedback?
If you have questions or feedback, reach out directly by joining our community or by writing to support@meroxa.com.
Happy Coding 🚀