· 6 min read

Using Threads with Flask

Background tasks for toy projects when Celery and a dedicated message queue service are too heavy.

I’ve decided to make a SSG website with Flask and Jinja templates which is something I have not done in awhile. This is partly an excuse to take a break from single page applications and write some vanilla JavaScript. But, also because I didn’t want to get caught up on frontend as the core of this project is heavily on the backend.

Everything was going very well until I decided I wanted to record which pages were being visited.

def cache_with_node_id():
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            node_id = request.args.get("nodeId")
            cache_key = f"{request.path}:{node_id}"
 
            cached_response = cache.get(cache_key)
            if cached_response is None:
                response = f(*args, **kwargs)
                cache.set(cache_key, response, timeout=DEFAULT_TIMEOUT)
            else:
                response = cached_response
 
            record_visit(node_id)
 
            if not isinstance(response, current_app.response_class):
                response = make_response(response)
 
            return response
        return decorated_function
    return decorator
 
 
@graph_bp.route("/graph")
@cache_with_node_id()
def graph():
    node_id = request.args.get("nodeId", default="e-1")
    # get graph data for requested node_id
    return render_template("graph.html", graph_data=graph_data)

What To Do?

Assuming I’ve optimized as far as I can to reduce the time required to write to the database what can I do to reduce the time required by record_visit?

Well generally I believe most people would recognize this task is perfect example for a task queue and worker. Throw the node ID to the queue and let a separate process handle the writing to the database. Writing to the task queue should be much faster than writing to the database for this to make sense of course.

However, I am steadfast that this project should be an exercise in keeping things simple, but not necessarily scalable. It is a small project after all. This means I will refrain from including additional services such as a message queue or spinning up a second container of my app running as a worker.

Background Tasks

FastAPI (from Starlette) has Background Tasks which use asyncio to throw a given function into a thread to run after the response is returned. That is essentially what I want to do but without asyncio because this is Flask and I’m not keen to mix the two.

Creating BackgroundTasks for Flask

background_tasks.py
class BackgroundTasks:
    def __init__(self, app=None):
        self.queue = queue.Queue(maxsize=1000)
        self.worker_thread = None
        self.db_path = None
        if app is not None:
            self.init_app(app)
 
    def init_app(self, app):
        self.db_path = app.config["APP_DB"]
        app.background_tasks = self
        atexit.register(self.teardown)
        self.start_worker()
 
    def start_worker(self):
        def worker():
            logger.debug("Starting worker")
            while True:
                try:
                    task = self.queue.get()
                    if task is None:
                        logger.info("Got sentinel value - Exiting thread")
                        break
                    self._record_visit(args["node_id"], args["timestamp"])
                except Exception as e:
                    logger.exception(f"Error processing task: {e}")
 
        self.worker_thread = threading.Thread(target=worker, daemon=True)
        self.worker_thread.start()
 
    def teardown(self, exception=None):
        self.queue.put(None)  # Send sentinel value to stop the worker
        if self.worker_thread:
            self.worker_thread.join(timeout=5)
 
    def record_visit(self, node_id):
        try:
            logger.info(f"Recording visit to node {node_id}")
            now = datetime.now(UTC).isoformat()
            self.queue.put_nowait({"node_id": node_id, "timestamp": now})
            return True
        except queue.Full:
            return False
 
    def _record_visit(self, node_id, timestamp):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(
            "INSERT INTO visits (node_id, timestamp) VALUES (?, ?)",
            (node_id, timestamp),
        )
        conn.commit()
        conn.close()

Like any other Flask extension, BackgroundTasks is initialized with init_app at some point during startup. Now, record_visit can be replaced with current_app.background_tasks.record_visit which places the arguments into a queue. When the worker fetches an item off the queue it then writes the page visit to the database via _record_visit.

One thing I learned was to make the thread a daemon otherwise you end up with hanging and periodic errors from Flask not returning responses.

Here is the new decorator for caching and recording page visits.

def cache_with_node_id():
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            node_id = request.args.get("nodeId")
            cache_key = f"{request.path}:{node_id}"
 
            cached_response = cache.get(cache_key)
            if cached_response is None:
                response = f(*args, **kwargs)
                cache.set(cache_key, response, timeout=DEFAULT_TIMEOUT)
            else:
                response = cached_response
 
            current_app.background_tasks.record_visit(node_id)
 
            if not isinstance(response, current_app.response_class):
                response = make_response(response)
 
            return response
        return decorated_function
    return decorator

In almost no time the page visit is written to the queue and the page response can be returned.

2024-08-21 00:00:27.532 | INFO     | app.views.graph_view:decorated_function:39 - Got cached: True
2024-08-21 00:00:27.534 | INFO     | app.background_tasks:record_visit:61 - Recording visit to node e-419

Deploy Flask with Threads

The final hurdle is deploying the Flask application as WSGI servers like uWSGI do not support threads out of the box. It was actually such a pain using uWSGI that I switched to using gunicorn instead. With gunicorn I only had to declare a few special configurations and my background tasks were working just like in development.

gunicorn_conf.py
workers = 1
worker_class = "gthread"
threads = 4

Later, I found decorators from uwsgidecorators could be attached to start_worker to make uWSGI work in production.

Limitations

Due to database lock errors being a potential issue I limited the application to a single worker. This is done with the gunicorn workers = 1 config and ensures only one copy of the application is running and therefore one worker and one writer to the database at a time. Granted this is more a limitation of SQLite but just something to keep in mind. And yes I could correct for most database lock errors with retries or something but that’s not simple. Remember this is suppose to be a simple project.

Even with a database that can support multiple writers this solution is not great for larger projects. It ties the web app lifecycle to the worker so that killing the web app will silently kill whatever the worker is doing. Not to mention my in-memory queue will also drop all of the tasks not yet completed when the web app dies.

Given there is only one worker, the queue can quickly back up when under load and you can not just spin up additional workers with this pattern.

Another big issue with this implementation was raised by Izaac Zhou that a thread in Flask is not guaranteed to be tied to the Request which triggered it. This can quickly lead to leaking data between requests if used improperly. It can be addressed as described in his blog post but I’ve not implemented it as it does not affect my use case.

Conclusion

This was a fun little exercise to play with and allow me to push a single Flask container a bit further so I don’t have to spin up additional services. Obviously not something to do for serious projects but nice for simple projects.

In no particular order are some links related to this topic and various other implementations of running threads with Flask.

https://smirnov-am.github.io/background-jobs-with-flask/ https://devcodef1.com/news/1311478/flask-queue-implementation-without-celery https://vmois.dev/python-flask-background-thread/ https://github.com/chrisjsimpson/flask-background-task-queue/blob/main/app.py https://github.com/sintezcs/flask-threads https://www.primerpy.com/2020/04/06/flask/flask-multi-threading/

    Share:
    Back to Blog