How to Queue and Delay Python Jobs With RQ and Redis
Get the project source code below, and follow along with the lesson material.
Download Project Source CodeTo set up the project on your local machine, please follow the directions provided in the README.md
file. If you run into any issues with running the project source code, then feel free to reach out to the author in the course's Discord channel.
Delayed Jobs
All of the Python code we have written so far has always been executing in the context of directly responding to an incoming web request. If we wanted to anything run anything that wasn't strictly necessary to render a page to user, (for example sending a confirmation email, or updating internal analytics), we would have to do it in a route and do it before we return a response.
If the code we are thinking of running could take a while to complete (like running a complicated query or talking to a slow API), our users would have to wait for that slow code to complete before being able to get a response.If the code might take longer than 60 seconds to run, we might not even be able to respond in time before the HTTP server in front of Flask cuts us off.
The goal of our web controller code should be focused on rendering a response to the end user, and any ancillary tasks like sending an confirmation email, would ideally be executed later in separate environment. To do that, we can setup a job queue and have a separate process act as "worker" to listen to the queue and execute the code.
To create the queue and workers, we'll use a Python library called RQ
(which stands for Redis Queue). If you have used other Python frameworks, RQ
is similar to Celery
task queue, but simpler to work with. As you can infer from the name, this will also mean we will need to install and serve redis
which is a fast in-memory data store that will store our queue.
Setting up Redis
Installing Redis
To install redis on OSX run: brew install redis
and then run brew services start redis
On Windows (using WSL) and Linux:
sudo apt-get install redis-server
sudo service redis-server start
Installing RQ
Then we will add rq
, a Flask Extension called Flask-RQ2
, and a tool called rq-dashboard
to our requirements.txt file.
Flask-RQ2
rq==1.4.3
rq-dashboard
(due to a testing incompatibility, we temporarily need to pin our rq
version to 1.4.3)
Make sure your virtual environment is activate, then run pip install -r requirements.txt
to install the two libraries.
Within yumroad/config.py
, we need to tell RQ where to find redis, so we will add a configuration variable.
class BaseConfig:
...
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
RQ_REDIS_URL = REDIS_URL
RQ_DASHBOARD_REDIS_URL = RQ_REDIS_URL
In our test environment, we don't want to delay tests and we do not even need to connect to redis, so under the test configuration, we will disable asynchronous job processing by adding RQ_ASYNC
to False
and change the type of connection.
class TestConfig:
...
RQ_ASYNC = False
RQ_CONNECTION_CLASS = 'fakeredis.FakeStrictRedis'
Within extensions.py
we can import RQ from flask-RQ2
and initialize it.
from flask_rq2 import RQ
...
rq2 = RQ()
Then within yumroad/__init__.py
, import rq2
from yumroad.extensions import ( ... , rq2)
def create_app(environment_name='dev'):
...
rq2.init_app(app)
...
Configuring & Processing Jobs
Using Flask-RQ2
we can designate specific functions as jobs using a decorator.
@rq2.job
def average(x, y):
print("I am running")
return (x + y)/2
Then to invoke the function, you can queue it up by using a queue
method.
In this case it would be average.queue(1, 2)
.
To see this in action, we can try it out from the Flask shell. For the sake of example, lets add this function called average
to extensions.py
. We can invoke it using RQ2 in by running flask shell
.
>>> from yumroad.extensions import average
>>> average(1, 2)
I am running
1.5
>>> job = average.queue(1, 2)
>>> job
FlaskJob('375200f3-d380-4822-94d4-c18b6d88e914', enqueued_at=datetime.datetime(2020, 7, 27, 1, 31, 22, 156183))
To launch a worker, in a separate terminal session, with your virtual activated, run flask rq worker
.
This will result in the following output.
$ flask rq worker
18:31:39 Worker rq:worker:a81e81d9f5104b91bd43ed58b4522aa7: started, version 1.5.0
18:31:39 *** Listening on default...
18:31:39 default: yumroad.extensions.average(1, 2) (375200f3-d380-4822-94d4-c18b6d88e914)
I am running
18:31:39 default: Job OK (375200f3-d380-4822-94d4-c18b6d88e914)
18:31:39 Result is kept for 500 seconds
If we wanted to later lookup the result, we can do so by having RQ2 fetch the result by getting the job by the job id.
>>> from yumroad.extensions import rq2
>>> job = rq2.get_queue().fetch_job('375200f3-d380-4822-94d4-c18b6d88e914')
>>> job.result
1.5
By default, jobs go into the default queue, but we can control what queues job go into to get more precision and control how many workers operate on which queue.
job = average.average.queue(3, 4, queue='important_math', timeout=60 * 5)
In this case, we would also need to tell our workers to listen to the important_math
queue.
$ flask rq worker important_math default
Scheduling Jobs
To schedule jobs, you can use the built in scheduler in RQ.
average.schedule(timedelta(seconds=60), 1, 2)
average.schedule(datetime(2020, 4, 25, 11, 59, 59), 1, 2) # UTC
In addition to running a worker listening on queues, you would want to run the rq scheduler
task so that RQ can keep an eye out for new jobs and queue them up when the time comes.
To run a scheduler, run the following command as well (in the background or in a different terminal window)
$ flask rq scheduler
Using a Job Queue for Emails
The process of sending an email is something that doesn't need to happen in order to render a page to users. In fact, if our mail server has downtime or is slow, that might result in the user seeing an error message when we should probably just retry sending the email. By putting the task of sending emails into a job queue, we will both be able to make our web responses faster and can also configure our job queue to retry failed jobs.
This lesson preview is part of the Fullstack Flask: Build a Complete SaaS App with Flask course and can be unlocked immediately with a single-time purchase. Already have access to this course? Log in here.
Get unlimited access to Fullstack Flask: Build a Complete SaaS App with Flask with a single-time purchase.