logo

    Engineering

    Lessons Learned from Building ZenSearch, Part II: Running Async Tasks

    How we moved from APScheduler in Flask to Celery on ECS for reliable, highly scalable async task processing

    Matt

    Matt

    8/19/24

    Introduction

    In the last post in this series, I wrote in a little more detail about why I began building ZenSearch. In this post, I'll talk about how we handle async work, and how the async task processing stack has evolved over time as it approached its first "stable" state. Future posts will talk about how the backend and frontend have evolved over time, and decisions I wish I'd made from the get-go.

    The Initial Stack

    The initial stack backing ZenSearch was basically:

    1. A Flask server, running on a DigitalOcean App Platform box for about $5 / month.
    2. A DigitalOcean-hosted Postgres database, for about $12 / month.

    And that was pretty much it. Flask would serve some rendered HTML templates and process form requests (think someone updating which companies they want to see jobs at in a basically unstyled HTML dropdown), and then we'd use flask-apscheduler to run the jobs that collected and processed postings, and that sent the notification emails. The whole app costed less than $20 / month.

    Scaling

    At the time, we only had about 20 companies in the database, and they were ones that I was interested in (the app was initially for me, after all). This meant that running posting collection and parsing in a background process on the same App Platform app that was running the web server worked fine. If we, for example, ran out of memory, the box would crash, the server would restart, and all would be well after a few minutes of downtime. This was fine for me - I didn't care if the app was down periodically as long as I got my notification emails.

    This changed as we started getting more users, and, most importantly, adding new companies to the database. Over the past year or so, we've gone from 20 companies in the database to about 10,000, and we now have about 1.5 million postings (~300k open) as well.

    For 20 companies, running posting ingestion tasks in a background process was fine. If each one took a minute or so, we'd need 20 minutes of background process time, and all was well. Once we hit a few hundred companies, this began to change quickly. It became apparent that we needed dedicated tooling for processing background tasks.

    For that, we reached for Celery.

    Celery

    We introduced Celery (and Redis, to use as a broker and a backend) last December and farmed our async task processing out to it instead of running it on the main app webserver. This meant we needed to spin up three new services in DigitalOcean: One for Celery itself, one for Beat, which is essentially a cron scheduler for Celery, and a Redis instance to serve as the broker and backend for Celery.

    This new async task processing rig more than tripled our costs overnight - we were now paying something like $10 / month for Celery, $5 for Beat, and $30 for Redis.

    The benefit, though, was that introducing Celery let us scale up significantly. All of a sudden, we were capable of processing jobs for hundreds (soon to be thousands) of companies without needing to worry about taking down our main webserver. We could also scale Celery independently of the main server, which has been beneficial to us over time, since today we're processing tens of thousands of async tasks per day.

    Growing Pains

    Over time, we ran into more headaches. As we went from a few hundred companies to a few thousand, we started hitting a few issues we could've foreseen, but decided to solve when we eventually ran into them. First, having thousands of companies in the database means thousands of tasks to ingest job postings. For reasons beyond the scope of this post, these ingestion tasks can take anywhere from fifteen seconds to a few minutes, depending on the company. The distribution is heavily right-skewed (it has a long tail), but the p50 runtime for a task to ingest new postings into the database might've taken 30 seconds. If you do the back-of-the-napkin math, you quickly realize it'll only take about 3,000 companies in the database (one task per company per day) before we'd run out of time in the day trying to process each company sequentially.

    At the same time, allowing for concurrency gets expensive too. If we try to scale up Celery to run more concurrent processes, we start hitting memory and CPU caps imposed by DigitalOcean.

    The initial solution to this was to scale up the size of the box we were running Celery on, but it became clear pretty quickly that this wasn't a solution that would work over the longer term. We'd likely be paying over $100 / month, which is pennies to a corporation, but wasn't something I wanted to fund out of pocket.

    Migrating Celery to ECS

    At this point, it started to become clear that the better strategy for running Celery would be to add a couple of cost-saving (and sometimes performance-improving) measures.

    First, we migrated off of DigitalOcean to Amazon ECS. There were a few reasons for this, but the main one was that ECS allows for autoscaling based on the depth of the broker queue, meaning we could run our posting ingestion tasks, scale up arbitrarily, and then shut off our instances once we'd processed all the tasks. This allows us to run our tasks on bigger boxes (with more concurrency) without needing to keep the boxes on all the time.

    The other major benefit of moving to ECS is the ability to use Fargate Spot instances, which often cost less than half as much as on-demand ones. Running spot instances has saved us a significant amount of money.

    At the same time, we migrated from Redis to RabbitMQ as the broker for Celery, since we really only needed a message broker and Rabbit was far cheaper, and also gave us a nice management UI with which we could monitor tasks being picked up and completed.

    All told, running Celery today costs us about $15 / month today.

    Celery Headaches and Lessons Learned

    There have been a few issues I've run into and spent a fair amount of time debugging with Celery that made me learn some lessons the hard way, so to speak.

    The first is that in general, it's beneficial to make tasks small, idempotent pieces of work whenever possible. For example, instead of one task that loops through 10,000 companies and collects postings at each of them, write a task that does it for each individual company, and then trigger each of them. Long-running tasks can cause problems with retries, workers dying, restarts from failure, and so on.

    For instance, Celery by default will acknowledge (ack) a task when it's picked up from the broker, so if you have a long-running task that dies halfway through, Celery will generally think it's completed.

    Another footgun was that with SQS, there's a broker visibility timeout that you (the Celery user) need to set. For instance, you can set this timeout to 30 seconds, which means that if you have a Celery worker pick up a task and you (e.g.) have task_acks_late=True set, then if that task does not complete after 30 seconds, it will be re-enqueued and re-processed. This means that a long-running, non-idempotent task could be retried over and over, which might (hypothetically) result in a result like ten emails going out to a single user one morning instead of just one.

    In general, writing tasks that can be rerun and finish quickly is a good way to combat some of these issues. We've made heavy use of techniques like fanout (where a single task enqueues a number of other tasks concurrently) and pagination (also sometimes known as batching, where a single task re-enqueues itself but with a "next page" pointer, such as incrementing an offset), in order to process large numbers of what would otherwise be long-running tasks individually with no loss of performance. This is extremely useful for things like indexing a large number of records into OpenSearch (more on this in a future post), where paginating through the records in each task is a great way to keep the individual task run times down, or using fanout to run a large number of posting ingestion tasks concurrently while keeping one process responsible for one task (company, in this case) at a time.

    The last issue we've repeatedly had issues with is a known issue among Celery users, which is Celery's lack of async support. For our tasks, we would (if we could) make heavy use of async / await, since so much of what we're doing are I/O bound operations (collecting job postings). However, since Celery does not support async, we instead need to use Celery's concurrency mechanisms to improve the performance of the workers. Largely, this has worked fine, but we did run into some issues periodically when trying to use the gevent pool where we'd get errors like this:

    Couldn't ack 8360, reason:"SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:2423)')"

    This would cause our Celery workers to hang without actually crashing, which would mean no tasks would be processed until we restarted the worker. As a workaround, we switched to using the threads pool instead of gevent.

    Doing It Again

    If I could do it again, I'd definitely make some choices differently early on. With the experience I have now, I would've reached straight for Celery instead of using the APScheduler. The APScheduler worked well and was easy to set up with the knowledge I had at the time, but knowing what I know now, Celery is very straightforward to orchestrate (at least when you're only using a few workers, have a single broker, are running on a managed service like ECS, etc.), and it would've saved a lot of refactoring to be able to go in that direction right away.

    There have been also a few side quests along the way that I probably wouldn't repeat, such as using Redis and then SQS as the brokers for Celery before ultimately deciding on Rabbit.

    One other viable option that I never considered would have been doing something even simpler, and using a tool like PGQueuer early on to keep the stack as simple as possible. Since we already rely heavily on Postgres, this would've required no new infrastructure for setting up a broker, and since our needs are pretty straightforward (basically just running cron jobs periodically), it likely would've worked just fine.

    Up Next

    In the next post, I'll discuss some things I've learned over time about making frontend code more friendly to work with, and choices I wish I'd made earlier on. Stay tuned!