Snowflake's rise to prominence in data-driven companies is undeniable, yet many users encounter a common bottleneck: the challenge of real-time data ingestion, particularly when it comes to upserts and deletes. Snowflake's native data ingest services, such as Snowpipe and Snowpipe Streaming, fall short of offering these crucial capabilities directly. This is where the innovative Snowflake Conduit Connector steps in, bridging this critical gap by enabling safe and real-time upserts or marking records for deletion in Snowflake. This article takes a closer look at the development journey of the Snowflake Conduit Connector, offers a guide on setting it up, evaluates its data stream performance, and previews future enhancements.
Snowflake's architecture revolutionized data warehousing with its cloud-native approach, but its real-time data manipulation capabilities needed a boost. The Snowflake Conduit Connector is designed to extend Snowflake's functionality, allowing for real-time data upserts and deletions, features eagerly awaited by many Snowflake users. This connector not only enhances Snowflake's capabilities but also ensures data integrity and timely data updates, critical for operational and analytical workloads.
The Snowflake Conduit Connector empowers users to seamlessly integrate real-time data upserts and deletes into their Snowflake data warehouse. This guide provides a comprehensive walkthrough for setting up the Snowflake Conduit Connector, ensuring you can quickly leverage its capabilities to enhance your data management processes.
Before starting the setup process, ensure you have:
CREATE ROLE conduit_connector_role;
CREATE USER conduit_connector_user PASSWORD = '<strong_password>' DEFAULT_ROLE = conduit_connector_role;
GRANT ROLE conduit_connector_role TO USER conduit_connector_user;
Log into Conduit:
Create a New Connector:
Configure Connector Settings:
Map Data Streams:
Review and Save:
Activate Connector:
One of the core advantages of the Snowflake Conduit Connector is its performance in handling data streams. Our development efforts were centered on ensuring the connector could manage high volumes of data with minimal latency, making real-time data ingestion, upserts, and deletes a reality. Here, we delve into performance metrics, showcasing the efficiency and reliability of the connector in various scenarios and highlighting how it stands up to the demands of modern data-driven operations.
Developing the Snowflake Conduit Connector was a journey marked by both challenges and breakthroughs. From conceptualization to launch, our team navigated through intricate technical hurdles, all while keeping the user's needs at the forefront. We uncovered how other platforms produced results with missing data.
Some of the issues encountered during dev - As Snowflake provides no direct way of doing upserts we had to bench-test our own workarounds for uploading data. We made several attempts :
Uploading data via csv file to Snowflake, copying data from csv into temporary table, then merging it into final.
Uploading data via Avro file to Snowflake, copying data from Avro file into temp table, and then merging into final.
Sample Copy and Merge Query:
Those above proved to be too slow, so we ended up going with this solution: uploading data in csv format, directly merging data from csv into the final table
Sample Merge Query On New Records:
MERGE INTO my_table as a USING ( select $1 meroxa_operation, $2 meroxa_created_at, $3 meroxa_updated_at, $4 meroxa_deleted_at, $5 data from @file/file.csv.gz (FILE_FORMAT => CSV_CONDUIT_SNOWFLAKE ) ) AS b ON a.id = b.id
WHEN MATCHED AND ( b.meroxa_operation = 'create' OR b.meroxa_operation = 'snapshot' ) THEN UPDATE SET a.meroxa_operation = b.meroxa_operation, a.meroxa_created_at = b.meroxa_created_at, a.meroxa_updated_at = b.meroxa_updated_at, a.meroxa_deleted_at = b.meroxa_deleted_at, a.data = b.data,
WHEN NOT MATCHED AND ( b.meroxa_operation = 'create' OR b.meroxa_operation = 'snapshot' ) THEN INSERT (a.meroxa_operation, a.meroxa_created_at, a.meroxa_updated_at, a.meroxa_deleted_at, a.data) VALUES (b.meroxa_operation, b.meroxa_created_at, b.meroxa_updated_at, b.meroxa_deleted_at, b.data) ;
Also, to speed up the processing of data in our connector, we needed to split the stream of records (let's say we get 10k in one batch) into several chunks allowed us to use goroutines to parallelize the file generation + file uploading efforts when generating and writing to csv file.
While Snowflake allows you to define primary keys, they don't enforce them. That's a huge issue as it can result in duplicates on the primary key to be inserted. But we’ve taken care of that with deduping during our merge and csv file generation (we check in both places). Since there are no duplicates in a batch, and we have compacted the records in-order (say, if you have CREATE, then UPDATE for a record). This eliminates the single batch ordering requirement.
We also had to ensure that we were properly compressing and uploading files to not lose any data and that there isn’t an extensive wait time to upload.
The Snowflake Conduit Connector is a living project, with ongoing enhancements aimed at addressing the evolving needs of Snowflake users. We are committed to continuous improvement, drawing on user feedback and emerging data management trends to refine and expand the connector’s capabilities. The following list is just a few features that are on the horizon:
The Snowflake Conduit Connector is more than just a solution to a problem; it's a testament to the power of innovation in the face of technical limitations. By enabling real-time upserts and deletes, this connector not only enhances Snowflake's capabilities but also empowers data-driven companies to manage their data more effectively and efficiently. As we continue to develop and improve the Snowflake Conduit Connector, we look forward to unlocking even greater possibilities for our users, ensuring their data pipelines are as dynamic and robust as the insights they seek to derive.