This post aims to cover a specific use case we had to do some time series forecasting, how it went and in the end, how we optimised/adjusted it.
The structure of this post:
Short intro to Snowpark
Infrastructure overview
Use case introduction
Snowpark solution
Final solution
Summary
Happy reading!
Short intro to Snowpark
Snowpark is a new developer experience for Snowflake that allows developers to write code in their preferred language and run that code directly on Snowflake. It exposes new interfaces for development in Python, Scala, or Java to supplement Snowflake’s original SQL interface.
Technically this feature tries to become a competition to Apache Spark, not going out of the Snowflakes ecosystem, allowing you to write additional user-defined functions (user-defined table functions, too) in your preferred language that you can later use in your SQL code. As this is a highly marketed feature of Snowflake, it doesn’t make sense for me to go over what’s done there or how it's working, so I’ll skip that. If you’re interested you can read more on Snowflake’s page.
Infrastructure overview
Talking about infrastructure here might not make sense, but it’s a crucial component in the story and how we came across some solutions on our hands. I’m not particularly eager to overcomplicate infrastructure since it creates headaches later, so our choices lead to a simple yet effective combination of tooling to ensure everything runs smoothly. Choosing more managed services makes sense when the team is not that big, so adding more infrastructure maintenance overhead instead of delivering solutions doesn’t make sense. So basically, we have chosen tools for the job that currently satisfies our needs.
Use case introduction
When this post was written, I had a task at the company I was working to predict the number of transactions coming through our system. This is one of our measured KPIs that gives a simple good sense of how we’re doing.
Since it seemed like a straightforward time-series forecasting task, I was lazy and knew AutoML libraries existed - without hesitation, I went with Prophet. If you’re not familiar with the library, you can read more about it here. Again, I prefer to go with simple but effective solutions, and it made a lot of sense to try Prophet properly here.
The flow for the forecasting should look like the majority of forecasting flows. We prepare the data with dbt, and we somehow fit and make predictions and push results to Snowflake for later downstream consumption.
If we’d look back at the infrastructure, we have several options to add this fit-predict step.
Add an EC2 instance and push the code execution there
Maybe SageMaker (I haven’t used it, so not sure if it’s an actual use case, but it went through my mind as a possible solution)
MWAA - create python tasks for this specifically
Try out Snowpark
Solution #1
So at the start, I dropped 1st and 2nd options because they added more complexity, where I didn’t see the need for them. No-brainer here.
It was a hard choice between MWAA and Snowpark, and I went with Snowpark just because there was an option to try something new and shiny (without any comparison tests 😐, this will bite me later in the story).
So we created a simple UDTF that fits our needs.
create or replace function data_warehouse.core.prophetize_daily(category_id string, y float, dt date, periods number)
returns table(id string, dt string, y float, yhat_lower float, yhat_upper float)
language python runtime_version = 3.8
packages = ('prophet')
handler = 'ProphetHandler'
as $$
import pandas as pd
from prophet import Prophet
class ProphetHandler:
def __init__(self):
self._dates = []
self._values = []
def process(self, category_id, y, dt, periods):
self._dates.append(dt)
self._values.append(y)
self._id = category_id
self._periods = periods
return ((category_id, dt, y, 0, 0), )
def end_partition(self):
df = pd.DataFrame(list(zip(self._dates, self._values)),
columns =['ds', 'y'])
model = Prophet(daily_seasonality=True, weekly_seasonality=True)
model.add_country_holidays(country_name='LT')
model.fit(df)
future_df = model.make_future_dataframe(
periods=self._periods,
include_history=False)
forecast = model.predict(future_df)
for row in forecast.itertuples():
yield(self._id, row.ds, row.yhat, row.yhat_lower, row.yhat_upper)
$$
;
So having this function available in SQL and making it generalized, we could re-use it in similar scenarios but on different granularity, groupings, etc.
We pushed it to production and let it run for 4-5 months like this till a couple of weeks ago.
Dbt model run on Snowflake XS Compute engine took around 13-14 minutes. At that time, it seemed like a simple solution with not that big of a cost.
Final Solution
At some point, something wasn’t right; we started getting some error messages that seemed very strange and new. We’ve reached out to Snowflake Support, and a ticket was created. This isn’t a business-critical KPI, so it’s OK to have it not updated for a bit, but still, it bothered me a lot, so I’ve decided to benchmark the MWAA solution.
I’ve spent a couple of hours rewriting some processing parts to be more efficient and leveraging Airflow's Dynamic Task Mapping feature. Keep in mind that this wasn’t available when we were choosing the Snowpark option since MWAA then had only 2.2.2 available. The dynamic task mapping feature came with 2.3.0; now we’re using 2.4.3, and there is already 2.5.1 available. MWAA catching up with Airflow versions allowed many of these things to help with performance and efficiency!
So long story short, we’ve ended up with a flow like this:
This dag is triggered by Data-Aware scheduling (this feature was introduced in the 2.4 version).
We truncate our end table, split our flow into 32 groups and pass them to our forecasting step, where we loop over each specific customer and predict their results and push it to Snowflake; when all 32 partitions are processed, we mark that Dataset was updated and we trigger downstream dependencies.
You might ask why 32. It’s not a magic number; in Airflow, there is a setting max_active_tasks_per_dag that, by default, is set to 16. In our case, it’s more than enough, and going with the practices I’ve learned with Apache Spark with executors, parallelism and partitions counts, I’ve chosen 16x2 (2 being a multiplier). So this approach should help with scale too.
Note: if you’d put the update dataset option on Dynamically Mapped Task, each time one task is updated, it would trigger downstream dependencies. So, in this case, it would trigger downstream DAGs 32 times - hence the Empty Operator at the end.
If we’d check the run times of this approach for precisely the same processing, we have this whole dag running for 2-3 minutes.
Summary
Not everything new and shiny is the best tool for your specific task. Don’t go chasing waterfalls. Same with new tech, try it out but be smart about it. Let’s compare both approaches:
Complexity: In MWAA, all abstractions are made in python; if any analyst wants to do some forecasting, they must know Python. Snowpark gives a neat SQL abstraction that lets you apply it quickly and efficiently wherever you want.
Debugging: Coming from the engineering side, I’m more than used to working with IDE and using all debugging capabilities it provides. Moving to Snowpark takes that out of me. I thought a month or so ago, they released Python workbooks in Snowsight that should allow you to do something similar, but I haven’t tried, so this might be a biased comparison.
Cost: We’re running medium instances on MWAA, which is 0.79$ per hour per worker. It was taking max 3 minutes for a DAG compared with 13-14mins on Snowflake on XS (XS price on Standard 2.6$, Enterprise 3.9$ with on-demand option). The difference is quite significant. Time spent is almost x5 bigger on Snowpark, let’s add the cost difference, and it’s even more significant. Just because I might not be assuming correctly MWAA price with parallelism involved with Dynamic Task Mapping, I won’t start with cost savings done.
Think carefully and choose responsibly. The snowpark has its use cases but is not always the right tool. Don’t hammer screws into a wall when you can use a screwdriver!