Data Quality and Reliability
Know your problems before messing with your production data or keeping your data reliable and quality high
Introduction
Saying that data is new oil/gold/bacon (whatever floats your boat) saying is quite popular now. Data people are finally getting recognition for their work and effort. It wasn't like that, not even that long ago. If you work in data, you know what I'm talking about. Excel monkeys, SQL writers, not many people took this field as necessary. With companies being more data-driven or wannabe one's data, people, are getting long-deserved recognition.
But what if data quality is not checked and flowing to data lakes, warehouses, lakehouses? Excellent saying, "garbage in — garbage out." You might notice that data quality is also becoming a scorching topic together with data reliability. In this blog post, I plan to cover my hypothetical scenario with some known/unknown tools and provide some insight into what's out there in the market. Note! SaaS tools don't have any playground option (or some amount of credits for testing out) like, i.e., Snowflake, so I will just mention them.
Keep in mind just because it’s easier to grasp the basics of open source technologies capabilities and check how they work by playing around with them on your data I’ll be more focusing on them. On SaaS products you might have to sign some papers/contracts and etc and only then fully check them out. Hence, why I listed some companies at the end.
Setup
Data warehouse — Snowflake. I created a stage in Snowflake with credentials embedded to make it work, so there is no need for authentication. I use it daily at work and have noticed some neat features: bulk copy, horizontal scaling, fast vertical scale-up, time travel.
I've used Airbyte in one of my previous posts, and it was a bit slow, so I will NOT use it (let's imagine that data is there already and not a point of this blog post). COPY INTO option in Snowflake beats it by far, so we will use AWS S3 as a storage layer, which will play nicely for ingesting the data.
dbt — I did some playing around with the tool and wrote a post about it, and now I can't live without using dbt for transformations!
Scheduler — Airflow. I use it extensively, really love the tool, and know my way around it. Lead the "One Scheduler to Rule Them All" initiative at HomeToGo; you can read about it on (surprise!) my blog post.
Data I will be using — Yellow taxi trip data. I extracted some dimensions lookup tables from there too.
Ingestion
Very similarly to the Airbyte piece, I'm loading things from the s3 layer. Just because my DWH solution is in the cloud, I don't need to download the data to my docker image and load it from there. So I re-used similar "create if not exists" logic on the staging table for the successful first run.
CREATE TABLE IF NOT EXISTS staging.fact_yellow_taxi_data (
vendor_id int,
tpep_pickup_datetime timestamp,
tpep_dropoff_datetime timestamp,
passenger_count int,
trip_distance DOUBLE PRECISION,
rate_code_id int,
store_and_fwd_flag char(1),
pu_location_id int,
du_location_id int,
payment_type int,
fare_amount DOUBLE PRECISION,
extra DOUBLE PRECISION,
mta_tax DOUBLE PRECISION,
tip_amount DOUBLE PRECISION,
tolls_amount DOUBLE PRECISION,
improvement_surcharge DOUBLE PRECISION,
total_amount DOUBLE PRECISION,
congestion_surcharge DOUBLE PRECISION
)
In order not to write anything on top of our staging table we truncate it beforehand, so we don't have any leftovers and loading with ingesting data from files.
The first two tasks are simple SnowflakeOperator (runs SQL statements) and the second one — S3ToSnowflakeOperator. File names are in straightforward structure, so I'll leverage airflow macros like shown below:
Now to load seeds (static dimensions), I loaded them as CSVs in Seeds folder
and to create them, I had to run them with the dbt seed command, so I'm using BashOperator. change_dir is the change dir command to our dbt folder.
Transformation
At this moment, we can try and load the data to our DWH schema out of staging. Since we're loading data of non-colliding periods and for this to be idempotent before we load the data, we should delete the same period data. For this, I'm going to borrow the macro we use at work, called "delete_if_incremental."
{% macro delete_if_incremental(where_clause) %}
{% set query%}
delete from {{ this }}
where {{ where_clause }}
{% endset %}
{% if is_incremental() %}
{%do run_query(query) %}
{% endif %}
{% endmacro %}
Why it’s needed? If we need to reload some faulty data, we don’t want that to have duplicates, one way of dealing with it is to delete the period your inserting. If it’s there — no duplication, not there — no harm done.
Delete query is executed on the model table if it's an incremental run — the table is empty/non-existent. If we wouldn’t add this condition we’d get an error that the table doesn’t exist, which is true for the first run or on backfilling it.
The model itself is pretty straightforward, we insert all columns as is, and for a bit of metadata there, I'm adding the month I'm loading and when the data was loaded (timestamp_ntz type).
Data Quality
Let's go to the good part! Let's set up some basic tests on relationships to assert some assumptions. Having dbt tests or dbt_expectations can do a lot for you to understand the data and if you need to act on it.
Initial assumptions:
amounts and fares/surcharges have to be greater or equal to 0
Vendor-Id, Rate codes, payment types should match what's in the seeds
distances, pickup, drop off times, location ids technically should not be nulls
Running tests already showed that some values are not as I expected and might be null. Apparently, for some reason, those amount columns can be negative. We also have vendor id 4 and rate_code_id 99, which weren't documented in the files. So I'm adjusting the seeds to have those respectively marked as UNKNOWN.
So imagine a very likely situation, you get some descriptions from your product manager, a company you're consulting that this will be the file and its possible values. You create a process on top of it, and you get these adorable issues already as I got here. So you can catch them soon with basic data validation rules.
How not mess up the production data later?
Option 1: Instead of testing the final table, we could move these tests on the source and ensure that source data is OK. This would require some additional manipulation when loading and result in the ETL approach.
Option 2: continue with this process, but use the snowflake clone option. Run tests, if they pass -> clone data to its final place. Will be ELT; end-users will see only cleansed/correct data. Cloning the table won't add complexity much, and we won't pollute the user-facing data.
Let's go for option 2. For this, we should use a different schema for all our final tables. Let’s call it DWH. In this schema, we’re going to move our seeds (static dimensions) and cloned version of the fact table.
Remember that cloning will not work if you don't specify that you're cloning a transient object. So adjusting dbt_project.yml will do the trick.
SQL compilation error: Transient object cannot be cloned to a permanent object.
How I did it in this case:
If you want to read more about the nuances of transient tables, you can check it out here.
Why do I see this method as superior?
Leveraging Snowflake cloning abilities we’re just adding references to the existing files, so additional storage, in this case, is very minimal (almost 0). If we’re not on Snowflake we can copy the table as is to the final schema.
Using different schema we ensure that only when data is verified and its quality ensured only then we replace it. Imagine a scenario where something breaks in the pipe and we mess up also historical periods. End users would see garbage and will ask what happened a lot. Data recovery might take a while and if you don’t have backups — let’s say I don’t want to be in your shoes in this situation.
With this scenario — user-facing data is not messed up. Can be used as well for easier recovery. And you can inform that there are issues with the pipeline, but none of the reporting is messed up.
Data Reliability
We've already got basic checks done. But what happens if we get too high volumes, anomalies in data itself? What do we do if schema changes break our pipelines — column type changed, column removed?
To tackle this, we can check some tools and see how they work in our scenario, and integrate them into our pipelines.
Re_data
I assume Re meaning is reliability. Super neat dbt package. You can check out their web page.
The tool checks for anomalies in your data. You have options to specify what to monitor and check or you can let the tool do it all.
You can follow up their tutorial (though it was a bit confusing at first, I got my head around it only when thoroughly reading the Reference part, especially config.
TL;DR: add to the model re_data_monitored=true and re_data_time_filter column if you have it i.e.
and add a BashOperator to execute
re_data run --start-date {{ds}} --end-date {{next_ds}} --interval days:1
this will run checks on each day (based on the time filter column) and check the source. You can specify different periods, i.e., 7 days or 30 days, if you have some seasonality. What's also nice is that it checks on your data for anomalies schema changes.
As you can see, there was a schema change on the model, which can alert us in slack. Quite helpful if you're relying on that data source.
Other players in the field
Elementary data — relatively fresh startup from Israel. They focus on table changes and data lineage (they plan to roll out column-level lineage). You can check out their video demo of it here. Basically, they're extending (altering) dbt YAML files with additional information and then visualizing it in their UI. When I talked to the company founders, I discovered that they started with data lineage, but since they also have this capability, I also mentioned them.
Anomalo — Startup providing SaaS solution for anomalies. A fully dedicated tool for data quality and reliability. This blog post can give you insight into how HomeToGo uses it in production.
Sifflet — I had a chat with their co-founder & CEO about the tool, but when I mentioned that we're using Anomalo and we're happy with them, wasn't much to talk about. It seems that their tool is doing similar things as Anomalo. The tool also provides data lineage and discovery capabilities, which is a nice bonus to it.
Datafold — looks like a tool that does it all, data reliability + column-level lineage. From trying out their demo looks like they're also using some taxi data too 😅
Monte Carlo Data — in reviews, looks like quite a mature tool with multiple integrations. Based on what blog posts they publish, it's pretty obvious that they went through the same pains and know what problems need solving.
Anodot — as for my time at Wix.com, we had a PoC with the tool for anomaly detection. This one focuses on real-time data. But with Kappa architecture getting traction, this one might be a solution to your problems.
In-house built tools — you can create a tool using Prophet, Kats, Orbit, or simple statistical libraries. The question is, how keen are you on maintaining them and adjusting as you go?
If I missed any — please let me know in the comments! Glad to check them out too.
Summary
If you want to use open-source software — Re_data is a perfect candidate. It nicely works on top of your dbt flows. They also have their UI, which I didn't explore here, where you can check logs about tests and checks you ran, including dbt tests! I see they're doing an excellent job and starting to look like a competition for SaaS tools. Let's see what and when will be released in their SaaS version (I doubt this tool will only be only open source).
If you're looking for SaaS software, there are plenty out there. Do PoCs with different vendors (all of them are keen to get you as a customer), so you might get a trial demo run on your data to evaluate their capabilities to the full extent.