Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions shexstatements/shexfromcsv.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,21 @@
data = ""
if filename:
# Validate and normalize the path while allowing relative subdirectories.
normalized_path = os.path.normpath(filepath.strip())
if not normalized_path:
raw_path = filepath.strip()
if not raw_path:
raise ValueError("Empty filename is not allowed")
# Normalize the user-supplied path.
normalized_path = os.path.normpath(raw_path)
# Disallow absolute paths supplied by the caller.
if os.path.isabs(normalized_path):
raise ValueError("Absolute paths are not allowed")
if normalized_path == ".." or normalized_path.startswith(".." + os.path.sep):
raise ValueError("Path traversal is not allowed")
with open(normalized_path) as csvfile:
# Resolve the path against a safe base directory (directory of this module).
base_dir = os.path.dirname(os.path.realpath(__file__))
real_candidate = os.path.realpath(os.path.join(base_dir, normalized_path))
# Ensure the final path is within the base directory to prevent path traversal.
if os.path.commonpath([base_dir, real_candidate]) != base_dir:
raise ValueError("Access to the specified file path is not allowed")
with open(real_candidate) as csvfile:

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
csvreader = csv.reader(csvfile, delimiter=delim)
rowno = 0
for row in csvreader:
Expand Down
49 changes: 35 additions & 14 deletions shexstatements/shexfromspreadsheet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from os import remove
from os.path import splitext
import tempfile

from odf.opendocument import load
from odf.table import TableCell, TableRow
Expand Down Expand Up @@ -46,13 +47,23 @@ def generate_shex_from_spreadsheet(filepath, skip_header=False, stream=None):

if(file_extension in {".xlsx", ".xlsm", ".xltx", ".xltm"}):
wb = None
temp_path = None
if stream is not None:
with open("tmp" + filepath, "wb") as sf:
sf.write(stream)
sf.close()
filepath = "tmp" + filepath
fd, temp_path = tempfile.mkstemp(suffix=file_extension)
try:
with open(fd, "wb") as sf:
sf.write(stream)
except TypeError:
# Fallback for environments where opening by fd is not supported
import os
os.close(fd)
with open(temp_path, "wb") as sf:
sf.write(stream)
filepath_to_open = temp_path
else:
filepath_to_open = filepath

wb = load_workbook(filepath)
wb = load_workbook(filepath_to_open)
for ws in wb.worksheets:
for i in range(1, ws.max_row+1):
line = list()
Expand All @@ -63,8 +74,8 @@ def generate_shex_from_spreadsheet(filepath, skip_header=False, stream=None):
line = "|".join(line)
data = data + line + "\n"

if stream is not None:
remove(filepath)
if stream is not None and temp_path is not None:
remove(temp_path)

elif(file_extension in {".xls"}):
wb = None
Expand All @@ -84,13 +95,23 @@ def generate_shex_from_spreadsheet(filepath, skip_header=False, stream=None):

elif(file_extension in {".ods"}):
wb = None
temp_path = None
if stream is not None:
with open("tmp" + filepath, "wb") as sf:
sf.write(stream)
sf.close()
filepath = "tmp" + filepath
fd, temp_path = tempfile.mkstemp(suffix=file_extension)
try:
with open(fd, "wb") as sf:
sf.write(stream)
except TypeError:
# Fallback for environments where opening by fd is not supported
import os
os.close(fd)
with open(temp_path, "wb") as sf:
sf.write(stream)
filepath_to_open = temp_path
else:
filepath_to_open = filepath

wb = load(filepath)
wb = load(filepath_to_open)
wb = wb.spreadsheet
rows = wb.getElementsByType(TableRow)
for row in rows:
Expand All @@ -101,8 +122,8 @@ def generate_shex_from_spreadsheet(filepath, skip_header=False, stream=None):
line.append(str(cell))
data = data+"|".join(line) + "\n"

if stream is not None:
remove(filepath)
if stream is not None and temp_path is not None:
remove(temp_path)

shexstatement = CSV.generate_shex_from_data_string(data)
except Exception as e:
Expand Down
Loading