How to Queue and Delay Python Jobs With RQ and Redis
Get the project source code below, and follow along with the lesson material.
Download Project Source CodeTo set up the project on your local machine, please follow the directions provided in the README.md
file. If you run into any issues with running the project source code, then feel free to reach out to the author in the course's Discord channel.
Delayed Jobs
All of the Python code we have written so far has always been executing in the context of directly responding to an incoming web request. If we wanted to anything run anything that wasn't strictly necessary to render a page to user, (for example sending a confirmation email, or updating internal analytics), we would have to do it in a route and do it before we return a response.
If the code we are thinking of running could take a while to complete (like running a complicated query or talking to a slow API), our users would have to wait for that slow code to complete before being able to get a response.If the code might take longer than 60 seconds to run, we might not even be able to respond in time before the HTTP server in front of Flask cuts us off.
The goal of our web controller code should be focused on rendering a response to the end user, and any ancillary tasks like sending an confirmation email, would ideally be executed later in separate environment. To do that, we can setup a job queue and have a separate process act as "worker" to listen to the queue and execute the code.
To create the queue and workers, we'll use a Python library called RQ
(which stands for Redis Queue). If you have used other Python frameworks, RQ
is similar to Celery
task queue, but simpler to work with. As you can infer from the name, this will also mean we will need to install and serve redis
which is a fast in-memory data store that will store our queue.
Setting up Redis
Installing Redis
To install redis on OSX run: brew install redis
and then run brew services start redis
On Windows (using WSL) and Linux:
sudo apt-get install redis-server
sudo service redis-server start
Installing RQ
Then we will add rq
, a Flask Extension called Flask-RQ2
, and a tool called rq-dashboard
to our requirements.txt file.
Flask-RQ2
rq==1.4.3
rq-dashboard
(due to a testing incompatibility, we temporarily need to pin our rq
version to 1.4.3)
Make sure your virtual environment is activate, then run pip install -r requirements.txt
to install the two libraries.
Within yumroad/config.py
, we need to tell RQ where to find redis, so we will add a configuration variable.
class BaseConfig:
...
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
RQ_REDIS_URL = REDIS_URL
RQ_DASHBOARD_REDIS_URL = RQ_REDIS_URL
In our test environment, we don't want to delay tests and we do not even need to connect to redis, so under the test configuration, we will disable asynchronous job processing by adding RQ_ASYNC
to False
and change the type of connection.
class TestConfig:
...
RQ_ASYNC = False
RQ_CONNECTION_CLASS = 'fakeredis.FakeStrictRedis'
Within extensions.py
we can import RQ from flask-RQ2
and initialize it.
from flask_rq2 import RQ
...
rq2 = RQ()
Then within yumroad/__init__.py
, import rq2
from yumroad.extensions import ( ... , rq2)
def create_app(environment_name='dev'):
...
rq2.init_app(app)
...
Configuring & Processing Jobs
Using Flask-RQ2
we can designate specific functions as jobs using a decorator.
@rq2.job
def average(x, y):
print("I am running")
return (x + y)/2
Then to invoke the function, you can queue it up by using a queue
method.
In this case it would be average.queue(1, 2)
.
To see this in action, we can try it out from the Flask shell. For the sake of example, lets add this function called average
to extensions.py
. We can invoke it using RQ2 in by running flask shell
.
>>> from yumroad.extensions import average
>>> average(1, 2)
I am running
1.5
>>> job = average.queue(1, 2)
>>> job
FlaskJob('375200f3-d380-4822-94d4-c18b6d88e914', enqueued_at=datetime.datetime(2020, 7, 27, 1, 31, 22, 156183))
To launch a worker, in a separate terminal session, with your virtual activated, run flask rq worker
.
This will result in the following output.
$ flask rq worker
18:31:39 Worker rq:worker:a81e81d9f5104b91bd43ed58b4522aa7: started, version 1.5.0
18:31:39 *** Listening on default...
18:31:39 default: yumroad.extensions.average(1, 2) (375200f3-d380-4822-94d4-c18b6d88e914)
I am running
18:31:39 default: Job OK (375200f3-d380-4822-94d4-c18b6d88e914)
18:31:39 Result is kept for 500 seconds
If we wanted to later lookup the result, we can do so by having RQ2 fetch the result by getting the job by the job id.
>>> from yumroad.extensions import rq2
>>> job = rq2.get_queue().fetch_job('375200f3-d380-4822-94d4-c18b6d88e914')
>>> job.result
1.5
By default, jobs go into the default queue, but we can control what queues job go into to get more precision and control how many workers operate on which queue.
job = average.average.queue(3, 4, queue='important_math', timeout=60 * 5)
In this case, we would also need to tell our workers to listen to the important_math
queue.
$ flask rq worker important_math default
Scheduling Jobs
To schedule jobs, you can use the built in scheduler in RQ.
average.schedule(timedelta(seconds=60), 1, 2)
average.schedule(datetime(2020, 4, 25, 11, 59, 59), 1, 2) # UTC
In addition to running a worker listening on queues, you would want to run the rq scheduler
task so that RQ can keep an eye out for new jobs and queue them up when the time comes.
To run a scheduler, run the following command as well (in the background or in a different terminal window)
$ flask rq scheduler
Using a Job Queue for Emails
The process of sending an email is something that doesn't need to happen in order to render a page to users. In fact, if our mail server has downtime or is slow, that might result in the user seeing an error message when we should probably just retry sending the email. By putting the task of sending emails into a job queue, we will both be able to make our web responses faster and can also configure our job queue to retry failed jobs.
This lesson preview is part of the Fullstack Flask: Build a Complete SaaS App with Flask course and can be unlocked immediately with a single-time purchase. Already have access to this course? Log in here.
Get unlimited access to Fullstack Flask: Build a Complete SaaS App with Flask with a single-time purchase.

[00:00 - 00:15] All of the code we've written so far has always been in the context of directly responding to a user request or running tests. If we wanted to run anything that wasn't strictly related to rendering a page, that would slow down the page for the user.
[00:16 - 00:30] And if we wanted to run anything that took longer than 60 seconds to run, for example, that might result in upstream errors, where our page takes too long to load and gets cut off. So the goal of our web controller code should always be focused on running a response to the user.
[00:31 - 00:44] An ancillary tasks like sending an email would ideally be executed later in a separate environment. To do that, we can set up a job queue and have a separate process act as a worker to listen to the queue and execute the code.
[00:45 - 00:55] This is also one way that we can create jobs to run periodically, for example. To create the queue on workers, we're going to use a Python library called RQ, which stands for Redisq.
[00:56 - 01:10] If you have used other Python frameworks, RQ is similar to Celery as a task queue, but it's just simpler to work with. As you can infer from the name, this also means we're going to install and start Redis server, which is a fast in-memory data store that will store RQ.
[01:11 - 01:18] So our first step is going to be setting up Redis. So we're going to go ahead and look at our terminal, where we're going to need to install Redis.
[01:19 - 01:25] So let's go and switch to our Redis. And on OSX, there are instructions on how you set it up using homebrew.
[01:26 - 01:41] It's pretty simple. On Windows, using Windows subsystem for Linux or on Linux, you can run most likely AppGet install Redis server or the appropriate command for your own version of Linux.
[01:42 - 01:43] Okay. Okay.
[01:44 - 01:55] So now we have Redis installed. Next up is we're going to install RQ, which is a Flask extension called FlaskRQ 2, and a tool called RQ Dashboard into our Python project.
[01:56 - 02:05] So let's go ahead and go into VS Code and set those up. We're going to add FlaskRQ2 and RQ Dashboard.
[02:06 - 02:18] Next we're going to go into our extensions file and initialize them. Now temporarily due to a testing and compatibility, we're going to have to fix our version of RQ to a specific one.
[02:19 - 02:35] So this is how we fix versions in requirements.txt. And in practice, it's probably a good idea to fix almost all of your versions in the requirements.txt file so that you don't accidentally create conflicts between versions if there 's a major upgrade.
[02:36 - 02:46] All right. Now in config.py, what we're going to need to do is set up Redis URLs and set up a few other fields that look depend on that.
[02:47 - 02:55] All right. So first off, we're going to set Redis URL here, and then we're going to set RQ Redis and RQ Dashboard Redis URL to those values.
[02:56 - 03:06] Next in our test config, we're going to run jobs instantly without the need to spin up a worker. And the way we're going to set that up is we're going to say RQ async is false.
[03:07 - 03:20] So we want stuff to run synchronously as opposed to asynchronously. And in production, what we're going to want to do is we're going to want to have it set to async if there's no Redis URL already set.
[03:21 - 03:28] All right. So we can now go ahead and go to extensions.py and initialize everything.
[03:29 - 03:33] So we can go to Flask RQ2. We're going to import RQ.
[03:34 - 03:44] And then we're going to set RQ2 is equal to RQ. If you're wondering why this is called RQ2, this is essentially a fork of the original library, which was Flask RQ.
[03:45 - 03:51] All right. In YumRoad, init.py, we're going to go ahead and import RQ2 here.
[03:52 - 04:01] And then we're going to call init app on that. All right.
[04:02 - 04:12] So in order to configure and process jobs using RQ2, we're going to have to decorate functions. So I'm going to go and create a really simple job here in extensions.py just temporarily.
[04:13 - 04:18] I'm going to create a function that does a lot of complicated math here. It computes an average.
[04:19 - 04:27] So it's going to say something like I am running. And then what it returns is X plus Y divided by two.
[04:28 - 04:33] So now we're going to define this as a job by saying RQ2.job. All right.
[04:34 - 04:39] Let's see what happens now. So in our terminal, we can go ahead and launch a shell.
[04:40 - 04:48] And whoops, we're going to pip install the char requirements.txt. And now we can run a shell.
[04:49 - 04:57] Okay. What we're going to do is we're going to import YumRoad.extensions.py.
[04:58 - 05:06] And so what we can do is we can directly call average and see what happens. And it runs for us.
[05:07 - 05:18] But what we can do is we can actually queue it so we can say average.q and we can pass the arguments we're doing there. And then it would hypothetically send something to Redis.
[05:19 - 05:25] However, here we see that there's a connection error. So let's go ahead and make sure Redis is running.
[05:26 - 05:39] So in order to start Redis, we're going to run sudo service Redis server start. And then we can run our Redis CLI and check that it connects and it's going to give us back a result.
[05:40 - 05:47] Great. So now that we have Redis server running, we can go ahead and run our shell again and import that job from YumRoad.extensions.
[05:48 - 05:57] And so what we can do is we can say average.q12 and see what happens. So it gets queued up, which is interesting.
[05:58 - 06:04] Let's queue up another job to maybe four and five and maybe some better numbers . Okay.
[06:05 - 06:15] So now we've queued up three jobs. To run a worker, what we can do is we can run FlaskRQWorker and that's going to spin up something that's going to work through all the jobs on the queue.
[06:16 - 06:19] Okay. So you can see exactly what happened.
[06:20 - 06:35] It's ran all three jobs and it's given us an okay with the job ID. In order to fetch the results, we can go back in our shell and from our queue to we can go ahead and get the results.
[06:36 - 06:43] So it's stored temporarily in Redis if we need to fetch it. If we don't care about the results, then we don't have to go back and get them.
[06:44 - 06:53] So we're going to call the getQ method, which gets the default queue here. And then we're going to fetch the job that's ID we are looking for.
[06:54 - 07:05] Okay. So if we look at job, it's a flash job instance, but we can also call dot result and we get the number there by default jobs go in the low default queue.
[07:06 - 07:16] But if we wanted to make them lower or higher priority, we could do that. So if we do average dot Q three, four, then we can say Q is equal to less important.
[07:17 - 07:29] We can also specify a timeout if we wanted in seconds. And then to want a server that's checking on that, we could say default and less important.
[07:30 - 07:42] And we could run another one in front of this called more important. So we process jobs in that order, come more important, default and less important.
[07:43 - 07:49] Okay. Now we're going to look at how we can use this in emails.
[07:50 - 07:57] The process of sending an email is something that can be a delayed job. So what we're going to do is we're going to create a folder within the YumRoad application called jobs.
[07:58 - 08:06] And this is where we're going to put all of our delayed jobs. I'm going to create a new one here called mailer.py and that's where we can put our job that we'll send emails.
[08:07 - 08:16] So first thing is we're going to import our Q two from YumRoad extensions import our Q two. Next, we're going to import the message.
[08:17 - 08:23] Okay. So now that we have the message, we can write a function that sends emails.
[08:24 - 08:27] Here it is. It takes in a bunch of parameters and it sends an email.
[08:28 - 08:45] And all we have to do to make this a job is go ahead and say, add dark you to decorator there. Now within our email.py function, instead of having our email.py always send the messages itself, what we can do is we can instead just queue up a message.
[08:46 - 09:05] So here for our welcome message, we can just say that our body is equal to the render template and then the rest we can make configurable. So here we could say that the subject is equal to this.
[09:06 - 09:19] And okay, and then to convert the messages, what we want to do is we want to store this as recipients right here. So we can just send that there and then the body is equal to the body.
[09:20 - 09:28] And then the last thing is we can store the rest as keyword arguments. So the store.user.email should also be CC here.
[09:29 - 09:37] Okay, so this is what our email now looks like. Let's do the same thing up here.
[09:38 - 09:45] And so we're going to set this to the body. And the email that we're going to do is user.email.
[09:46 - 09:53] We won't have anything we need to see here. And the subject is going to be this.
[09:54 - 10:14] Okay, so now we have one that uses our delayed jobs and specifically uses the one in defined in mailer.py. Before we test this out, let's go ahead and add a web dashboard where we can see jobs queue up before we run them.
[10:15 - 10:25] So in order to do this, what we can do is we can create a blueprint that is going to be specifically for the dashboard here. So we're going to call this RQ dashboard.
[10:26 - 10:32] Then we're going to use the blueprint that's defined in RQ dashboard. Then we also might want to think about layering authentication in on top of this.
[10:33 - 10:43] Probably not a smart idea to just let anyone access our dashboard. So we're going to have to think about how we're going to do that in Flask.
[10:44 - 11:03] It's going to be a function that takes in as many arguments as I was passed in. Since it's a decorator, args, keyword, args, and we're going to say RQ blueprint before request.
[11:04 - 11:23] And what we're going to say is if current user is authenticated, and we're going to say, if they are not authenticated, return abort 401. So we're going to have to import the board as well.
[11:24 - 11:28] Okay. So here is our authentication.
[11:29 - 11:31] Now this isn't very much authentication. Anyone can sign up.
[11:32 - 11:50] So you might want to consider adding some things to say like, if the current user dot ID is not equal to one, that would also help. But for now we're going to leave it relatively insecure by letting it be openly accessible.
[11:51 - 11:52] Okay. So our Q dash bar is here.
[11:53 - 12:04] Then going back into knit.py, what we're going to want to do is import our RQ blueprint and register it at a URL prefix for slash RQ. So once we do that, we can check out what that looks like in our browser.
[12:05 - 12:12] So going to Firefox, we can see the page nodes and I'm logged in. If I go to RQ, I can see a dashboard here.
[12:13 - 12:21] And if I log out and go to RQ, it says do not authorize, which is exactly what we want. Log back in and go to the RQ dashboard.
[12:22 - 12:27] I can see the status of all the jobs here. Back in mail.py, we're going to have to import mailer.
[12:28 - 12:47] So from yumroad.jobs import mailer. Okay.
[12:48 - 12:52] And then here I can actually go ahead and register a new account. Okay.
[12:53 - 13:02] So that hit registered successfully. Now going back to RQ, I can see that one job actually queued up here and we can see what it is, but there are no workers running.
[13:03 - 13:13] So what we should do is we should start a worker on a separate terminal window and we can see what happens. So I'm going to do that now.
[13:14 - 13:20] One other thing we're going to have to fix is this import right here. We forgot to import mail here.
[13:21 - 13:33] Now you'll see that because we had that error there, our job ended up on a failed queue, which means our worker tried to handle it, but it failed. So what I'm first going to do is I'm going to start up our worker again.
[13:34 - 13:38] So as soon as I start up our worker, I can see that it's queued up here. I can actually go ahead and recue our job.
[13:39 - 13:46] But we're going to have to exempt this blueprint from CSRF to make this work. Going back in our code, we're going to go back to yumroad in it.
[13:47 - 13:58] And in here, we're going to go ahead and exempt the entire RQ blueprint from CS RF. The reason is the RQ dashboard itself isn't aware of all the CSRF limitations we create.
[13:59 - 14:09] So going back, we can now refresh our RQ dashboard and check about the failed jobs. Okay, so here's our jobs.
[14:10 - 14:18] Now what we can do is we can hit recue and have our worker process them. So at this point, our worker processed them and I can see the mail in my inbox, which is great.
[14:19 - 14:34] Okay, so we just ran our code coverage testing here. And after installing fake Redis and running our test code coverage, we saw that there's a few lines of code missing.
[14:35 - 14:42] Namely, line 31, 10 through 11 here and 33 here. So let's go back into our code and fix those.
[14:43 - 14:47] So first off, we're going to go to extensions.py. We don't need this job to be here anymore.
[14:48 - 14:54] So we can delete it. Next off is we're going to go to RQ dashboards and we're going to actually test that code there.
[14:55 - 15:07] So we're going to go and create a new file called test RQ dashboard.py and paste in some tests here. So here I've entered in two tests that essentially make a request to RQ dashboard.
[15:08 - 15:16] And if it's, and if we're not logged in, it should give us a 401. And if we are logged in, it should give us a 200 and be able to return data to us.
[15:17 - 15:29] Now the last line of code that's missing is yumroad init.py and line 31. So if sentry dsn is not defined, then this code is not going to be run.
[15:30 - 15:38] So what we can do is we can either mark it as not required to be Q aid or define sentry dsn. Either one works.
[15:39 - 15:51] What I'm going to do is I'm going to mark this as not being necessary to cover since it's dependent on our config settings. All right, going back to our test suite, we can go ahead and try and run our test coverage now.
[15:52 - 15:58] Great. So we're at 100% good coverage.
[15:59 - 16:07] You notice a few warnings here about deprecation warnings. For the most part, they're fine, I think.
[16:08 - 16:09] In the next section, we're going to work on improving performance.