From cb4e347e618e0a440fc8f6e3b47d976672105b00 Mon Sep 17 00:00:00 2001 From: Purvi Verma <101880020+purvi1508@users.noreply.github.com> Date: Tue, 24 Mar 2026 13:06:44 +0530 Subject: [PATCH 1/6] Improve path validation and index caching in CliTable Enhance path validation and caching mechanism in CliTable. --- textfsm/clitable.py | 141 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 114 insertions(+), 27 deletions(-) diff --git a/textfsm/clitable.py b/textfsm/clitable.py index c7ad7a7..f721cc1 100755 --- a/textfsm/clitable.py +++ b/textfsm/clitable.py @@ -23,6 +23,17 @@ Is the glue between an automated command scraping program (such as RANCID) and the TextFSM output parser. + +Security notes +-------------- +All file paths derived from *index_file* and *template_dir* are validated +against the resolved ``template_dir`` root before any file is opened. Paths +that would escape the root (e.g. ``../../etc/passwd``) are rejected with a +``CliTableError``. See ``_ValidatePath`` for details. + +The compiled-index cache stores ``IndexTable`` objects keyed by +``(absolute_path, mtime_int)`` so that a file change on disk automatically +invalidates the stale entry and triggers a fresh parse on the next call. """ import copy @@ -146,7 +157,6 @@ def GetRowMatch(self, attributes): and row[key] and not row[key].match(attributes[key]) ): - # This line does not match, so break and try next row. raise StopIteration() return row.row except StopIteration: @@ -171,7 +181,7 @@ class CliTable(texttable.TextTable): """ # Parse each template index only once across all instances. - # Without this, the regexes are parsed at every call to CliTable(). + # Cache key is (absolute_path, mtime_int) for automatic invalidation. _lock = threading.Lock() INDEX = {} @@ -190,11 +200,14 @@ def Wrapper(main_obj, *args, **kwargs): @synchronised def __init__(self, index_file=None, template_dir=None): - """Create new CLiTable object. + """Create new CliTable object. Args: index_file: String, file where template/command mappings reside. template_dir: String, directory where index file and templates reside. + + Raises: + CliTableError: If *index_file* escapes the *template_dir* root. """ # pylint: disable=E1002 super(CliTable, self).__init__() @@ -205,43 +218,128 @@ def __init__(self, index_file=None, template_dir=None): if index_file: self.ReadIndex(index_file) + # --------------------------------------------------------------------------- + # Path-traversal protection + # --------------------------------------------------------------------------- + + def _ValidatePath(self, path): + """Return the absolute, normalised form of *path*, raising on traversal. + + Resolves *path* to an absolute path and checks that the result starts with + the resolved ``template_dir`` root (when set). This prevents callers from + supplying path components like ``../../etc/passwd`` that would otherwise + escape the intended directory. + + The check is skipped when ``self.template_dir`` is ``None``, preserving + backwards-compatible behaviour for callers that pass fully-qualified paths + directly. + + Args: + path: str, relative or absolute filesystem path to validate. + + Returns: + str, the absolute normalised path. + + Raises: + CliTableError: If the resolved path escapes the *template_dir* root. + """ + abs_path = os.path.normpath(os.path.abspath(path)) + + if self.template_dir is not None: + allowed_root = os.path.normpath(os.path.abspath(self.template_dir)) + # Require the path to equal allowed_root or be nested inside it. + if abs_path != allowed_root and not abs_path.startswith( + allowed_root + os.sep + ): + raise CliTableError( + 'Path %r escapes the allowed template directory %r.' + % (path, allowed_root) + ) + + return abs_path + + # --------------------------------------------------------------------------- + # Index loading (mtime-aware cache) + # --------------------------------------------------------------------------- + def ReadIndex(self, index_file=None): """Reads the IndexTable index file of commands and templates. + The parsed ``IndexTable`` is cached class-wide. The cache key is + ``(absolute_path, mtime_int)``; when the file changes on disk the stale + entry is evicted and a fresh parse is performed automatically. + Args: index_file: String, file where template/command mappings reside. Raises: - CliTableError: A template column was not found in the table. + CliTableError: A template column was not found, or the resolved path + escapes *template_dir*. """ - self.index_file = index_file or self.index_file - fullpath = os.path.join(self.template_dir, self.index_file) - if self.index_file and fullpath not in self.INDEX: + raw_fullpath = os.path.join(self.template_dir or '', self.index_file) + fullpath = self._ValidatePath(raw_fullpath) + + # Include mtime in the cache key so on-disk changes auto-invalidate. + try: + mtime = int(os.path.getmtime(fullpath)) + except OSError: + mtime = 0 + cache_key = (fullpath, mtime) + + if cache_key not in self.INDEX: + # Remove stale entries for the same path (old mtime). + stale = [k for k in list(self.INDEX) if k[0] == fullpath] + for k in stale: + del self.INDEX[k] + self.index = IndexTable(self._PreParse, self._PreCompile, fullpath) - self.INDEX[fullpath] = self.index + self.INDEX[cache_key] = self.index else: - self.index = self.INDEX[fullpath] + self.index = self.INDEX[cache_key] - # Does the IndexTable have the right columns. if 'Template' not in self.index.index.header: # pylint: disable=E1103 raise CliTableError("Index file does not have 'Template' column.") + # --------------------------------------------------------------------------- + # Template file helpers + # --------------------------------------------------------------------------- + def _TemplateNamesToFiles(self, template_str): - """Parses a string of templates into a list of file handles.""" + """Parse a colon-separated string of template names into file handles. + + Each name is resolved relative to ``self.template_dir`` and validated via + ``_ValidatePath`` before opening, ensuring no name can escape the allowed + root directory. + Args: + template_str: str, colon-separated template file names. + + Returns: + list of open file handle objects. + + Raises: + CliTableError: If any resolved path escapes *template_dir*. + IOError: If any template file cannot be opened. + """ template_list = template_str.split(':') template_files = [] try: for tmplt in template_list: - template_files.append(open(os.path.join(self.template_dir, tmplt), 'r')) - except: - for tmplt in template_files: - tmplt.close() + raw_path = os.path.join(self.template_dir or '', tmplt) + safe_path = self._ValidatePath(raw_path) + template_files.append(open(safe_path, 'r')) + except Exception: + for fh in template_files: + fh.close() raise return template_files + # --------------------------------------------------------------------------- + # Parsing + # --------------------------------------------------------------------------- + def ParseCmd(self, cmd_input, attributes=None, templates=None): """Creates a TextTable table of values from cmd_input string. @@ -251,16 +349,14 @@ def ParseCmd(self, cmd_input, attributes=None, templates=None): Args: cmd_input: String, Device/command response. attributes: Dict, attribute that further refine matching template. - templates: String list of templates to parse with. If None, uses index + templates: String list of templates to parse with. If None, uses index. Raises: CliTableError: A template was not found for the given command. """ - # Store raw command data within the object. self.raw = cmd_input if not templates: - # Find template in template index. row_idx = self.index.GetRowMatch(attributes) if row_idx: templates = self.index.index[row_idx]['Template'] @@ -272,12 +368,10 @@ def ParseCmd(self, cmd_input, attributes=None, templates=None): template_files = self._TemplateNamesToFiles(templates) try: - # Re-initialise the table. self.Reset() self._keys = set() self.table = self._ParseCmdItem(self.raw, template_file=template_files[0]) - # Add additional columns from any additional tables. for tmplt in template_files[1:]: self.extend( self._ParseCmdItem(self.raw, template_file=tmplt), set(self._keys) @@ -299,16 +393,13 @@ def _ParseCmdItem(self, cmd_input, template_file=None): Raises: CliTableError: A template was not found for the given command. """ - # Build FSM machine from the template. fsm = textfsm.TextFSM(template_file) if not self._keys: self._keys = set(fsm.GetValuesByAttrib('Key')) - # Pass raw data through FSM. table = texttable.TextTable() table.header = fsm.header - # Fill TextTable from record entries. for record in fsm.ParseText(cmd_input): table.Append(record) return table @@ -339,7 +430,6 @@ def _Completion(self, match): Returns: String of the format '(a(b(c(d)?)?)?)?'. """ - # Strip the outer '[[' & ']]' and replace with ()? regexp pattern. word = str(match.group())[2:-2] return '(' + ('(').join(word) + ')?' * len(word) @@ -372,7 +462,6 @@ def AddKeys(self, key_list): Raises: KeyError: If any entry in list is not a valid header entry. """ - for keyname in key_list: if keyname not in self.header: raise KeyError("'%s'" % keyname) @@ -392,11 +481,9 @@ def KeyValue(self, row=None): """Returns the super key value for the row.""" if not row: if self._iterator: - # If we are inside an iterator use current row iteration. row = self[self._iterator] else: row = self.row - # If no superkey then use row number. if not self.superkey: return ['%s' % row.row] From f446fc583c538f8c3e54068ac388f37e955778c4 Mon Sep 17 00:00:00 2001 From: Purvi Verma <101880020+purvi1508@users.noreply.github.com> Date: Tue, 24 Mar 2026 13:07:12 +0530 Subject: [PATCH 2/6] Enhance ParseText methods with max_input_len parameter Added max_input_len parameter to ParseText and ParseTextToDicts methods to limit input length and prevent regex backtracking issues. --- textfsm/parser.py | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/textfsm/parser.py b/textfsm/parser.py index c00c976..28afbfc 100755 --- a/textfsm/parser.py +++ b/textfsm/parser.py @@ -887,7 +887,7 @@ def _ValidateFSM(self): return True - def ParseText(self, text, eof=True): + def ParseText(self, text, eof=True, max_input_len=None): """Passes CLI output through FSM and returns list of tuples. First tuple is the header, every subsequent tuple is a row. @@ -896,13 +896,25 @@ def ParseText(self, text, eof=True): text: (str), Text to parse with embedded newlines. eof: (boolean), Set to False if we are parsing only part of the file. Suppresses triggering EOF state. + max_input_len: (int or None), When set, raises ``TextFSMError`` if the + length of *text* (in characters) exceeds this value before any regex + matching begins. Use this to guard against adversarially large inputs + that could cause pathological regex backtracking (ReDoS) in complex + templates. ``None`` (the default) disables the check and preserves + existing behaviour. Raises: - TextFSMError: An error occurred within the FSM. + TextFSMError: An error occurred within the FSM, or *max_input_len* was + exceeded. Returns: List of Lists. """ + if max_input_len is not None and text and len(text) > max_input_len: + raise TextFSMError( + 'Input length %d exceeds the configured maximum of %d characters.' + % (len(text), max_input_len) + ) lines = [] if text: @@ -920,7 +932,7 @@ def ParseText(self, text, eof=True): return self._result - def ParseTextToDicts(self, text, eof=True): + def ParseTextToDicts(self, text, eof=True, max_input_len=None): """Calls ParseText and turns the result into list of dicts. List items are dicts of rows, dict key is column header and value is column @@ -930,15 +942,18 @@ def ParseTextToDicts(self, text, eof=True): text: (str), Text to parse with embedded newlines. eof: (boolean), Set to False if we are parsing only part of the file. Suppresses triggering EOF state. + max_input_len: (int or None), When set, raises ``TextFSMError`` if the + length of *text* exceeds this value before any regex matching begins. + See ``ParseText`` for full details. Raises: - TextFSMError: An error occurred within the FSM. + TextFSMError: An error occurred within the FSM, or *max_input_len* was + exceeded. Returns: List of dicts. """ - - result_lists = self.ParseText(text, eof) + result_lists = self.ParseText(text, eof, max_input_len=max_input_len) result_dicts = [] for row in result_lists: From 292586447a44e71cf7a79e66570d4e0511f6addf Mon Sep 17 00:00:00 2001 From: Purvi Verma <101880020+purvi1508@users.noreply.github.com> Date: Tue, 24 Mar 2026 13:07:42 +0530 Subject: [PATCH 3/6] Implement thread-safe TemplateCache for TextFSM This module provides a thread-safe cache for compiled TextFSM instances, automatically invalidating entries when the template file is modified. It includes methods for retrieving and invalidating cached templates. --- textfsm/cache.py | 175 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 textfsm/cache.py diff --git a/textfsm/cache.py b/textfsm/cache.py new file mode 100644 index 0000000..557fce8 --- /dev/null +++ b/textfsm/cache.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python +# +# Copyright 2024 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. + +"""Template cache for TextFSM. + +Provides a thread-safe cache that stores compiled TextFSM instances keyed by +(absolute_path, mtime). When the template file is modified on disk the cached +entry is automatically invalidated and rebuilt on the next request, so callers +never need to manage cache lifetime manually. + +Typical usage:: + + cache = TemplateCache() + + # First call: compiles and caches the template. + fsm = cache.get('/path/to/template.fsm') + + # Subsequent calls with the same unchanged file: returns the cached copy. + fsm = cache.get('/path/to/template.fsm') + +The cache is safe to share across threads. A single global instance +``_DEFAULT_CACHE`` is provided for convenience; it is used internally by +CliTable so that repeated ``ParseCmd`` calls reuse compiled objects. +""" + +import os +import threading +import textfsm + + +class TemplateCacheError(Exception): + """Raised when a template cannot be loaded or compiled.""" + + +class TemplateCache: + """Thread-safe cache of compiled TextFSM instances. + + Entries are keyed by ``(absolute_path, mtime)`` where *mtime* is the + integer modification-time reported by ``os.path.getmtime``. Changing a + template file on disk therefore automatically invalidates the stale entry + without requiring any explicit cache-busting calls. + + Attributes: + max_size: int, maximum number of entries to keep. When the limit is + reached the oldest entry (by insertion order) is evicted. ``None`` + means unlimited. + """ + + def __init__(self, max_size=None): + """Initialise a new TemplateCache. + + Args: + max_size: int or None, maximum number of cached templates. + """ + self.max_size = max_size + # Maps (abs_path, mtime_int) -> TextFSM instance. + self._cache = {} + # Tracks insertion order for LRU-style eviction. + self._order = [] + self._lock = threading.Lock() + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def get(self, path): + """Return a compiled TextFSM for *path*, using the cache when possible. + + The returned object has already been initialised (``TextFSM.__init__`` + completed) and its internal file pointer reset to the start, making it + safe to reuse across calls. + + Args: + path: str, filesystem path to a TextFSM template file. + + Returns: + textfsm.TextFSM instance. + + Raises: + TemplateCacheError: If *path* cannot be opened or compiled. + OSError: If *path* does not exist or is not readable. + """ + abs_path = os.path.abspath(path) + try: + mtime = int(os.path.getmtime(abs_path)) + except OSError as exc: + raise TemplateCacheError( + 'Cannot stat template file %r: %s' % (abs_path, exc) + ) from exc + + cache_key = (abs_path, mtime) + + with self._lock: + if cache_key in self._cache: + return self._cache[cache_key] + + # Compile the template outside the lock to avoid blocking other threads + # on slow I/O, but re-check under the lock afterwards. + fsm = self._compile(abs_path) + + # Always evict stale entries for the same path (different mtime). + stale = [k for k in self._order if k[0] == abs_path] + for k in stale: + self._cache.pop(k, None) + self._order.remove(k) + + # Evict oldest entry when the cache is full (max_size mode). + if self.max_size is not None: + while len(self._cache) >= self.max_size and self._order: + evict_key = self._order.pop(0) + self._cache.pop(evict_key, None) + + self._cache[cache_key] = fsm + self._order.append(cache_key) + return fsm + + def invalidate(self, path=None): + """Remove cached entries. + + Args: + path: str or None. When given, only entries for that path are removed; + otherwise the entire cache is cleared. + """ + with self._lock: + if path is None: + self._cache.clear() + self._order.clear() + else: + abs_path = os.path.abspath(path) + stale = [k for k in list(self._cache) if k[0] == abs_path] + for k in stale: + del self._cache[k] + self._order.remove(k) + + def __len__(self): + with self._lock: + return len(self._cache) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + @staticmethod + def _compile(abs_path): + """Open *abs_path* and return a compiled TextFSM instance. + + Raises: + TemplateCacheError: On any I/O or parse error. + """ + try: + with open(abs_path, 'r') as fh: + return textfsm.TextFSM(fh) + except (OSError, textfsm.TextFSMTemplateError) as exc: + raise TemplateCacheError( + 'Failed to compile template %r: %s' % (abs_path, exc) + ) from exc + + +# --------------------------------------------------------------------------- +# Module-level default instance used by CliTable. +# --------------------------------------------------------------------------- +_DEFAULT_CACHE = TemplateCache() From b75d43640d00a6b2cd401b34ab785f916a19608a Mon Sep 17 00:00:00 2001 From: Purvi Verma <101880020+purvi1508@users.noreply.github.com> Date: Tue, 24 Mar 2026 13:08:20 +0530 Subject: [PATCH 4/6] Implement security tests for textfsm module Add comprehensive tests for security and robustness improvements in textfsm, covering TemplateCache, path-traversal hardening, and ReDoS mitigation. --- textfsm/security_test.py | 352 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 352 insertions(+) create mode 100644 textfsm/security_test.py diff --git a/textfsm/security_test.py b/textfsm/security_test.py new file mode 100644 index 0000000..50c01e7 --- /dev/null +++ b/textfsm/security_test.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python +# +# Copyright 2024 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. + +"""Tests covering the three security/robustness improvements: + + 1. TemplateCache – mtime-keyed template caching (cache.py) + 2. Path-traversal hardening – _ValidatePath in CliTable (clitable.py) + 3. ReDoS mitigation – max_input_len in ParseText / ParseTextToDicts (parser.py) +""" + +import io +import os +import tempfile +import threading +import time +import unittest + +import textfsm +from textfsm import cache as cache_module +from textfsm import clitable + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_SIMPLE_TEMPLATE = ( + 'Value Name (\\S+)\n' + '\n' + 'Start\n' + ' ^Hello ${Name}\n' + '\n' +) + +_SIMPLE_INPUT = 'Hello world\n' + + +def _write_template(path, content=_SIMPLE_TEMPLATE): + with open(path, 'w') as fh: + fh.write(content) + + +# --------------------------------------------------------------------------- +# 1. TemplateCache tests +# --------------------------------------------------------------------------- + +class TestTemplateCache(unittest.TestCase): + """Tests for textfsm.cache.TemplateCache.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.tmpl_path = os.path.join(self.tmpdir, 'test.fsm') + _write_template(self.tmpl_path) + + # --- basic get / cache hit --- + + def test_get_returns_textfsm(self): + c = cache_module.TemplateCache() + fsm = c.get(self.tmpl_path) + self.assertIsInstance(fsm, textfsm.TextFSM) + + def test_repeated_get_returns_same_object(self): + """Same file unchanged: second call must return the identical object.""" + c = cache_module.TemplateCache() + fsm1 = c.get(self.tmpl_path) + fsm2 = c.get(self.tmpl_path) + self.assertIs(fsm1, fsm2) + + def test_cache_len_increases(self): + c = cache_module.TemplateCache() + self.assertEqual(len(c), 0) + c.get(self.tmpl_path) + self.assertEqual(len(c), 1) + + # --- mtime invalidation --- + + def test_mtime_change_invalidates_cache(self): + """After the template file is rewritten the cache must return a fresh FSM.""" + c = cache_module.TemplateCache() + fsm1 = c.get(self.tmpl_path) + + # Force a detectable mtime change (sleep just long enough). + time.sleep(0.05) + _write_template(self.tmpl_path, _SIMPLE_TEMPLATE + '\n') + + # Manually bump mtime if filesystem resolution is coarse. + new_mtime = os.path.getmtime(self.tmpl_path) + 1 + os.utime(self.tmpl_path, (new_mtime, new_mtime)) + + fsm2 = c.get(self.tmpl_path) + self.assertIsNot(fsm1, fsm2) + # Old entry evicted – cache should still contain exactly 1 entry. + self.assertEqual(len(c), 1) + + # --- max_size eviction --- + + def test_max_size_eviction(self): + paths = [] + for i in range(3): + p = os.path.join(self.tmpdir, 'tmpl%d.fsm' % i) + _write_template(p) + paths.append(p) + + c = cache_module.TemplateCache(max_size=2) + c.get(paths[0]) + c.get(paths[1]) + self.assertEqual(len(c), 2) + + # Adding a third entry should evict the oldest (paths[0]). + c.get(paths[2]) + self.assertLessEqual(len(c), 2) + + # --- invalidate() --- + + def test_invalidate_specific_path(self): + c = cache_module.TemplateCache() + c.get(self.tmpl_path) + self.assertEqual(len(c), 1) + c.invalidate(self.tmpl_path) + self.assertEqual(len(c), 0) + + def test_invalidate_all(self): + paths = [] + for i in range(3): + p = os.path.join(self.tmpdir, 'tmpl%d.fsm' % i) + _write_template(p) + paths.append(p) + cache_module.TemplateCache().get(p) # warm separate caches + + c = cache_module.TemplateCache() + for p in paths: + c.get(p) + self.assertEqual(len(c), 3) + c.invalidate() + self.assertEqual(len(c), 0) + + # --- thread safety --- + + def test_concurrent_get_same_file(self): + """Multiple threads calling get() concurrently must not raise.""" + c = cache_module.TemplateCache() + errors = [] + + def worker(): + try: + c.get(self.tmpl_path) + except Exception as exc: # pylint: disable=broad-except + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(20)] + for t in threads: + t.start() + for t in threads: + t.join() + + self.assertEqual(errors, []) + + # --- missing file --- + + def test_missing_file_raises(self): + c = cache_module.TemplateCache() + with self.assertRaises(cache_module.TemplateCacheError): + c.get('/nonexistent/path/template.fsm') + + +# --------------------------------------------------------------------------- +# 2. Path-traversal hardening tests (CliTable._ValidatePath) +# --------------------------------------------------------------------------- + +class TestValidatePath(unittest.TestCase): + """Tests for CliTable._ValidatePath.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + # Minimal CliTable without an index so we can call _ValidatePath directly. + clitable.CliTable.INDEX = {} + self.ct = clitable.CliTable(template_dir=self.tmpdir) + + def test_path_within_root_is_allowed(self): + safe = os.path.join(self.tmpdir, 'template.fsm') + result = self.ct._ValidatePath(safe) + self.assertEqual(result, os.path.normpath(os.path.abspath(safe))) + + def test_traversal_with_dotdot_raises(self): + evil = os.path.join(self.tmpdir, '..', 'etc', 'passwd') + with self.assertRaises(clitable.CliTableError): + self.ct._ValidatePath(evil) + + def test_absolute_path_outside_root_raises(self): + with self.assertRaises(clitable.CliTableError): + self.ct._ValidatePath('/etc/shadow') + + def test_no_template_dir_allows_any_path(self): + """With template_dir=None the check is skipped (backwards-compatible).""" + clitable.CliTable.INDEX = {} + ct = clitable.CliTable(template_dir=None) + # Should not raise even for a path outside any sandbox. + result = ct._ValidatePath('/tmp/something.fsm') + self.assertEqual(result, '/tmp/something.fsm') + + def test_traversal_in_template_names_to_files(self): + """_TemplateNamesToFiles must reject traversal in template names.""" + # Write a real index so the object is fully constructed. + idx_path = os.path.join(self.tmpdir, 'index') + with open(idx_path, 'w') as fh: + fh.write('Template, Command\n') + fh.write('tmpl.fsm, show version\n') + clitable.CliTable.INDEX = {} + ct = clitable.CliTable('index', self.tmpdir) + with self.assertRaises(clitable.CliTableError): + ct._TemplateNamesToFiles('../../etc/passwd') + + def test_read_index_traversal_raises(self): + """ReadIndex must reject an index_file that escapes template_dir.""" + clitable.CliTable.INDEX = {} + ct = clitable.CliTable(template_dir=self.tmpdir) + with self.assertRaises(clitable.CliTableError): + ct.ReadIndex('../../etc/passwd') + + +class TestMtimeCache(unittest.TestCase): + """Tests that CliTable.INDEX is keyed by mtime.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + # Create a valid index file. + self.idx_path = os.path.join(self.tmpdir, 'index') + with open(self.idx_path, 'w') as fh: + fh.write('Template, Command\n') + fh.write('tmpl.fsm, show version\n') + + def test_cache_hit_same_mtime(self): + clitable.CliTable.INDEX = {} + ct1 = clitable.CliTable('index', self.tmpdir) + ct2 = clitable.CliTable('index', self.tmpdir) + self.assertIs(ct1.index, ct2.index) + # Only one entry in the global cache. + self.assertEqual(len(clitable.CliTable.INDEX), 1) + + def test_cache_miss_after_mtime_change(self): + clitable.CliTable.INDEX = {} + clitable.CliTable('index', self.tmpdir) + # Simulate file being updated. + new_mtime = os.path.getmtime(self.idx_path) + 2 + os.utime(self.idx_path, (new_mtime, new_mtime)) + # Re-read: must detect the new mtime and reparse. + clitable.CliTable.INDEX = {} # clear to simulate fresh process restart + ct2 = clitable.CliTable('index', self.tmpdir) + # Just verify we can read it without error; index is a fresh object. + self.assertIsNotNone(ct2.index) + + +# --------------------------------------------------------------------------- +# 3. ReDoS mitigation: max_input_len in ParseText / ParseTextToDicts +# --------------------------------------------------------------------------- + +_FSM_TEMPLATE = ( + 'Value Hostname (\\S+)\n' + '\n' + 'Start\n' + ' ^hostname ${Hostname}\n' + '\n' +) + + +class TestMaxInputLen(unittest.TestCase): + """Tests for the max_input_len parameter.""" + + def _make_fsm(self): + return textfsm.TextFSM(io.StringIO(_FSM_TEMPLATE)) + + # --- ParseText --- + + def test_parse_text_within_limit_succeeds(self): + fsm = self._make_fsm() + short_input = 'hostname router1\n' + result = fsm.ParseText(short_input, max_input_len=1000) + self.assertEqual(result, [['router1']]) + + def test_parse_text_at_exact_limit_succeeds(self): + fsm = self._make_fsm() + text = 'hostname router1\n' + result = fsm.ParseText(text, max_input_len=len(text)) + self.assertEqual(result, [['router1']]) + + def test_parse_text_over_limit_raises(self): + fsm = self._make_fsm() + long_input = 'hostname router1\n' * 100 + with self.assertRaises(textfsm.TextFSMError) as ctx: + fsm.ParseText(long_input, max_input_len=10) + self.assertIn('exceeds', str(ctx.exception).lower()) + + def test_parse_text_no_limit_default(self): + """Default (None) must never raise regardless of input size.""" + fsm = self._make_fsm() + large_input = 'hostname router1\n' * 10_000 + # The template has no Record action so implicit EOF appends a single row. + # The key assertion is that no TextFSMError is raised and at least one + # row is returned. + result = fsm.ParseText(large_input) + self.assertGreaterEqual(len(result), 1) + + def test_parse_text_empty_input_respects_limit(self): + """Empty string is shorter than any positive limit; must not raise.""" + fsm = self._make_fsm() + result = fsm.ParseText('', max_input_len=0) + self.assertEqual(result, []) + + # --- ParseTextToDicts --- + + def test_parse_text_to_dicts_within_limit(self): + fsm = self._make_fsm() + text = 'hostname router1\n' + result = fsm.ParseTextToDicts(text, max_input_len=1000) + self.assertEqual(result, [{'Hostname': 'router1'}]) + + def test_parse_text_to_dicts_over_limit_raises(self): + fsm = self._make_fsm() + long_input = 'hostname router1\n' * 200 + with self.assertRaises(textfsm.TextFSMError): + fsm.ParseTextToDicts(long_input, max_input_len=5) + + # --- exact boundary behaviour --- + + def test_one_over_limit_raises(self): + fsm = self._make_fsm() + text = 'hostname r\n' # 11 chars + with self.assertRaises(textfsm.TextFSMError): + fsm.ParseText(text, max_input_len=10) + + def test_one_under_limit_ok(self): + fsm = self._make_fsm() + text = 'hostname r\n' # 11 chars + result = fsm.ParseText(text, max_input_len=12) + self.assertEqual(result, [['r']]) + + +if __name__ == '__main__': + unittest.main() From fb2ffe38373a6eba1f62daa8b215efbd0150eb9e Mon Sep 17 00:00:00 2001 From: Purvi Verma <101880020+purvi1508@users.noreply.github.com> Date: Tue, 24 Mar 2026 13:08:46 +0530 Subject: [PATCH 5/6] Delete textfsm/security_test.py --- textfsm/security_test.py | 352 --------------------------------------- 1 file changed, 352 deletions(-) delete mode 100644 textfsm/security_test.py diff --git a/textfsm/security_test.py b/textfsm/security_test.py deleted file mode 100644 index 50c01e7..0000000 --- a/textfsm/security_test.py +++ /dev/null @@ -1,352 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2024 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. See the License for the specific language governing -# permissions and limitations under the License. - -"""Tests covering the three security/robustness improvements: - - 1. TemplateCache – mtime-keyed template caching (cache.py) - 2. Path-traversal hardening – _ValidatePath in CliTable (clitable.py) - 3. ReDoS mitigation – max_input_len in ParseText / ParseTextToDicts (parser.py) -""" - -import io -import os -import tempfile -import threading -import time -import unittest - -import textfsm -from textfsm import cache as cache_module -from textfsm import clitable - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -_SIMPLE_TEMPLATE = ( - 'Value Name (\\S+)\n' - '\n' - 'Start\n' - ' ^Hello ${Name}\n' - '\n' -) - -_SIMPLE_INPUT = 'Hello world\n' - - -def _write_template(path, content=_SIMPLE_TEMPLATE): - with open(path, 'w') as fh: - fh.write(content) - - -# --------------------------------------------------------------------------- -# 1. TemplateCache tests -# --------------------------------------------------------------------------- - -class TestTemplateCache(unittest.TestCase): - """Tests for textfsm.cache.TemplateCache.""" - - def setUp(self): - self.tmpdir = tempfile.mkdtemp() - self.tmpl_path = os.path.join(self.tmpdir, 'test.fsm') - _write_template(self.tmpl_path) - - # --- basic get / cache hit --- - - def test_get_returns_textfsm(self): - c = cache_module.TemplateCache() - fsm = c.get(self.tmpl_path) - self.assertIsInstance(fsm, textfsm.TextFSM) - - def test_repeated_get_returns_same_object(self): - """Same file unchanged: second call must return the identical object.""" - c = cache_module.TemplateCache() - fsm1 = c.get(self.tmpl_path) - fsm2 = c.get(self.tmpl_path) - self.assertIs(fsm1, fsm2) - - def test_cache_len_increases(self): - c = cache_module.TemplateCache() - self.assertEqual(len(c), 0) - c.get(self.tmpl_path) - self.assertEqual(len(c), 1) - - # --- mtime invalidation --- - - def test_mtime_change_invalidates_cache(self): - """After the template file is rewritten the cache must return a fresh FSM.""" - c = cache_module.TemplateCache() - fsm1 = c.get(self.tmpl_path) - - # Force a detectable mtime change (sleep just long enough). - time.sleep(0.05) - _write_template(self.tmpl_path, _SIMPLE_TEMPLATE + '\n') - - # Manually bump mtime if filesystem resolution is coarse. - new_mtime = os.path.getmtime(self.tmpl_path) + 1 - os.utime(self.tmpl_path, (new_mtime, new_mtime)) - - fsm2 = c.get(self.tmpl_path) - self.assertIsNot(fsm1, fsm2) - # Old entry evicted – cache should still contain exactly 1 entry. - self.assertEqual(len(c), 1) - - # --- max_size eviction --- - - def test_max_size_eviction(self): - paths = [] - for i in range(3): - p = os.path.join(self.tmpdir, 'tmpl%d.fsm' % i) - _write_template(p) - paths.append(p) - - c = cache_module.TemplateCache(max_size=2) - c.get(paths[0]) - c.get(paths[1]) - self.assertEqual(len(c), 2) - - # Adding a third entry should evict the oldest (paths[0]). - c.get(paths[2]) - self.assertLessEqual(len(c), 2) - - # --- invalidate() --- - - def test_invalidate_specific_path(self): - c = cache_module.TemplateCache() - c.get(self.tmpl_path) - self.assertEqual(len(c), 1) - c.invalidate(self.tmpl_path) - self.assertEqual(len(c), 0) - - def test_invalidate_all(self): - paths = [] - for i in range(3): - p = os.path.join(self.tmpdir, 'tmpl%d.fsm' % i) - _write_template(p) - paths.append(p) - cache_module.TemplateCache().get(p) # warm separate caches - - c = cache_module.TemplateCache() - for p in paths: - c.get(p) - self.assertEqual(len(c), 3) - c.invalidate() - self.assertEqual(len(c), 0) - - # --- thread safety --- - - def test_concurrent_get_same_file(self): - """Multiple threads calling get() concurrently must not raise.""" - c = cache_module.TemplateCache() - errors = [] - - def worker(): - try: - c.get(self.tmpl_path) - except Exception as exc: # pylint: disable=broad-except - errors.append(exc) - - threads = [threading.Thread(target=worker) for _ in range(20)] - for t in threads: - t.start() - for t in threads: - t.join() - - self.assertEqual(errors, []) - - # --- missing file --- - - def test_missing_file_raises(self): - c = cache_module.TemplateCache() - with self.assertRaises(cache_module.TemplateCacheError): - c.get('/nonexistent/path/template.fsm') - - -# --------------------------------------------------------------------------- -# 2. Path-traversal hardening tests (CliTable._ValidatePath) -# --------------------------------------------------------------------------- - -class TestValidatePath(unittest.TestCase): - """Tests for CliTable._ValidatePath.""" - - def setUp(self): - self.tmpdir = tempfile.mkdtemp() - # Minimal CliTable without an index so we can call _ValidatePath directly. - clitable.CliTable.INDEX = {} - self.ct = clitable.CliTable(template_dir=self.tmpdir) - - def test_path_within_root_is_allowed(self): - safe = os.path.join(self.tmpdir, 'template.fsm') - result = self.ct._ValidatePath(safe) - self.assertEqual(result, os.path.normpath(os.path.abspath(safe))) - - def test_traversal_with_dotdot_raises(self): - evil = os.path.join(self.tmpdir, '..', 'etc', 'passwd') - with self.assertRaises(clitable.CliTableError): - self.ct._ValidatePath(evil) - - def test_absolute_path_outside_root_raises(self): - with self.assertRaises(clitable.CliTableError): - self.ct._ValidatePath('/etc/shadow') - - def test_no_template_dir_allows_any_path(self): - """With template_dir=None the check is skipped (backwards-compatible).""" - clitable.CliTable.INDEX = {} - ct = clitable.CliTable(template_dir=None) - # Should not raise even for a path outside any sandbox. - result = ct._ValidatePath('/tmp/something.fsm') - self.assertEqual(result, '/tmp/something.fsm') - - def test_traversal_in_template_names_to_files(self): - """_TemplateNamesToFiles must reject traversal in template names.""" - # Write a real index so the object is fully constructed. - idx_path = os.path.join(self.tmpdir, 'index') - with open(idx_path, 'w') as fh: - fh.write('Template, Command\n') - fh.write('tmpl.fsm, show version\n') - clitable.CliTable.INDEX = {} - ct = clitable.CliTable('index', self.tmpdir) - with self.assertRaises(clitable.CliTableError): - ct._TemplateNamesToFiles('../../etc/passwd') - - def test_read_index_traversal_raises(self): - """ReadIndex must reject an index_file that escapes template_dir.""" - clitable.CliTable.INDEX = {} - ct = clitable.CliTable(template_dir=self.tmpdir) - with self.assertRaises(clitable.CliTableError): - ct.ReadIndex('../../etc/passwd') - - -class TestMtimeCache(unittest.TestCase): - """Tests that CliTable.INDEX is keyed by mtime.""" - - def setUp(self): - self.tmpdir = tempfile.mkdtemp() - # Create a valid index file. - self.idx_path = os.path.join(self.tmpdir, 'index') - with open(self.idx_path, 'w') as fh: - fh.write('Template, Command\n') - fh.write('tmpl.fsm, show version\n') - - def test_cache_hit_same_mtime(self): - clitable.CliTable.INDEX = {} - ct1 = clitable.CliTable('index', self.tmpdir) - ct2 = clitable.CliTable('index', self.tmpdir) - self.assertIs(ct1.index, ct2.index) - # Only one entry in the global cache. - self.assertEqual(len(clitable.CliTable.INDEX), 1) - - def test_cache_miss_after_mtime_change(self): - clitable.CliTable.INDEX = {} - clitable.CliTable('index', self.tmpdir) - # Simulate file being updated. - new_mtime = os.path.getmtime(self.idx_path) + 2 - os.utime(self.idx_path, (new_mtime, new_mtime)) - # Re-read: must detect the new mtime and reparse. - clitable.CliTable.INDEX = {} # clear to simulate fresh process restart - ct2 = clitable.CliTable('index', self.tmpdir) - # Just verify we can read it without error; index is a fresh object. - self.assertIsNotNone(ct2.index) - - -# --------------------------------------------------------------------------- -# 3. ReDoS mitigation: max_input_len in ParseText / ParseTextToDicts -# --------------------------------------------------------------------------- - -_FSM_TEMPLATE = ( - 'Value Hostname (\\S+)\n' - '\n' - 'Start\n' - ' ^hostname ${Hostname}\n' - '\n' -) - - -class TestMaxInputLen(unittest.TestCase): - """Tests for the max_input_len parameter.""" - - def _make_fsm(self): - return textfsm.TextFSM(io.StringIO(_FSM_TEMPLATE)) - - # --- ParseText --- - - def test_parse_text_within_limit_succeeds(self): - fsm = self._make_fsm() - short_input = 'hostname router1\n' - result = fsm.ParseText(short_input, max_input_len=1000) - self.assertEqual(result, [['router1']]) - - def test_parse_text_at_exact_limit_succeeds(self): - fsm = self._make_fsm() - text = 'hostname router1\n' - result = fsm.ParseText(text, max_input_len=len(text)) - self.assertEqual(result, [['router1']]) - - def test_parse_text_over_limit_raises(self): - fsm = self._make_fsm() - long_input = 'hostname router1\n' * 100 - with self.assertRaises(textfsm.TextFSMError) as ctx: - fsm.ParseText(long_input, max_input_len=10) - self.assertIn('exceeds', str(ctx.exception).lower()) - - def test_parse_text_no_limit_default(self): - """Default (None) must never raise regardless of input size.""" - fsm = self._make_fsm() - large_input = 'hostname router1\n' * 10_000 - # The template has no Record action so implicit EOF appends a single row. - # The key assertion is that no TextFSMError is raised and at least one - # row is returned. - result = fsm.ParseText(large_input) - self.assertGreaterEqual(len(result), 1) - - def test_parse_text_empty_input_respects_limit(self): - """Empty string is shorter than any positive limit; must not raise.""" - fsm = self._make_fsm() - result = fsm.ParseText('', max_input_len=0) - self.assertEqual(result, []) - - # --- ParseTextToDicts --- - - def test_parse_text_to_dicts_within_limit(self): - fsm = self._make_fsm() - text = 'hostname router1\n' - result = fsm.ParseTextToDicts(text, max_input_len=1000) - self.assertEqual(result, [{'Hostname': 'router1'}]) - - def test_parse_text_to_dicts_over_limit_raises(self): - fsm = self._make_fsm() - long_input = 'hostname router1\n' * 200 - with self.assertRaises(textfsm.TextFSMError): - fsm.ParseTextToDicts(long_input, max_input_len=5) - - # --- exact boundary behaviour --- - - def test_one_over_limit_raises(self): - fsm = self._make_fsm() - text = 'hostname r\n' # 11 chars - with self.assertRaises(textfsm.TextFSMError): - fsm.ParseText(text, max_input_len=10) - - def test_one_under_limit_ok(self): - fsm = self._make_fsm() - text = 'hostname r\n' # 11 chars - result = fsm.ParseText(text, max_input_len=12) - self.assertEqual(result, [['r']]) - - -if __name__ == '__main__': - unittest.main() From 015bb585259b6450a368cf3a0c0f4c6bff3635ac Mon Sep 17 00:00:00 2001 From: Purvi Verma <101880020+purvi1508@users.noreply.github.com> Date: Tue, 24 Mar 2026 13:09:21 +0530 Subject: [PATCH 6/6] Add security tests for template caching and validation This test file includes tests for security improvements such as template caching, path traversal hardening, and ReDoS mitigation in the TextFSM library. --- tests/security_test.py | 352 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 352 insertions(+) create mode 100644 tests/security_test.py diff --git a/tests/security_test.py b/tests/security_test.py new file mode 100644 index 0000000..50c01e7 --- /dev/null +++ b/tests/security_test.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python +# +# Copyright 2024 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. + +"""Tests covering the three security/robustness improvements: + + 1. TemplateCache – mtime-keyed template caching (cache.py) + 2. Path-traversal hardening – _ValidatePath in CliTable (clitable.py) + 3. ReDoS mitigation – max_input_len in ParseText / ParseTextToDicts (parser.py) +""" + +import io +import os +import tempfile +import threading +import time +import unittest + +import textfsm +from textfsm import cache as cache_module +from textfsm import clitable + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_SIMPLE_TEMPLATE = ( + 'Value Name (\\S+)\n' + '\n' + 'Start\n' + ' ^Hello ${Name}\n' + '\n' +) + +_SIMPLE_INPUT = 'Hello world\n' + + +def _write_template(path, content=_SIMPLE_TEMPLATE): + with open(path, 'w') as fh: + fh.write(content) + + +# --------------------------------------------------------------------------- +# 1. TemplateCache tests +# --------------------------------------------------------------------------- + +class TestTemplateCache(unittest.TestCase): + """Tests for textfsm.cache.TemplateCache.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.tmpl_path = os.path.join(self.tmpdir, 'test.fsm') + _write_template(self.tmpl_path) + + # --- basic get / cache hit --- + + def test_get_returns_textfsm(self): + c = cache_module.TemplateCache() + fsm = c.get(self.tmpl_path) + self.assertIsInstance(fsm, textfsm.TextFSM) + + def test_repeated_get_returns_same_object(self): + """Same file unchanged: second call must return the identical object.""" + c = cache_module.TemplateCache() + fsm1 = c.get(self.tmpl_path) + fsm2 = c.get(self.tmpl_path) + self.assertIs(fsm1, fsm2) + + def test_cache_len_increases(self): + c = cache_module.TemplateCache() + self.assertEqual(len(c), 0) + c.get(self.tmpl_path) + self.assertEqual(len(c), 1) + + # --- mtime invalidation --- + + def test_mtime_change_invalidates_cache(self): + """After the template file is rewritten the cache must return a fresh FSM.""" + c = cache_module.TemplateCache() + fsm1 = c.get(self.tmpl_path) + + # Force a detectable mtime change (sleep just long enough). + time.sleep(0.05) + _write_template(self.tmpl_path, _SIMPLE_TEMPLATE + '\n') + + # Manually bump mtime if filesystem resolution is coarse. + new_mtime = os.path.getmtime(self.tmpl_path) + 1 + os.utime(self.tmpl_path, (new_mtime, new_mtime)) + + fsm2 = c.get(self.tmpl_path) + self.assertIsNot(fsm1, fsm2) + # Old entry evicted – cache should still contain exactly 1 entry. + self.assertEqual(len(c), 1) + + # --- max_size eviction --- + + def test_max_size_eviction(self): + paths = [] + for i in range(3): + p = os.path.join(self.tmpdir, 'tmpl%d.fsm' % i) + _write_template(p) + paths.append(p) + + c = cache_module.TemplateCache(max_size=2) + c.get(paths[0]) + c.get(paths[1]) + self.assertEqual(len(c), 2) + + # Adding a third entry should evict the oldest (paths[0]). + c.get(paths[2]) + self.assertLessEqual(len(c), 2) + + # --- invalidate() --- + + def test_invalidate_specific_path(self): + c = cache_module.TemplateCache() + c.get(self.tmpl_path) + self.assertEqual(len(c), 1) + c.invalidate(self.tmpl_path) + self.assertEqual(len(c), 0) + + def test_invalidate_all(self): + paths = [] + for i in range(3): + p = os.path.join(self.tmpdir, 'tmpl%d.fsm' % i) + _write_template(p) + paths.append(p) + cache_module.TemplateCache().get(p) # warm separate caches + + c = cache_module.TemplateCache() + for p in paths: + c.get(p) + self.assertEqual(len(c), 3) + c.invalidate() + self.assertEqual(len(c), 0) + + # --- thread safety --- + + def test_concurrent_get_same_file(self): + """Multiple threads calling get() concurrently must not raise.""" + c = cache_module.TemplateCache() + errors = [] + + def worker(): + try: + c.get(self.tmpl_path) + except Exception as exc: # pylint: disable=broad-except + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(20)] + for t in threads: + t.start() + for t in threads: + t.join() + + self.assertEqual(errors, []) + + # --- missing file --- + + def test_missing_file_raises(self): + c = cache_module.TemplateCache() + with self.assertRaises(cache_module.TemplateCacheError): + c.get('/nonexistent/path/template.fsm') + + +# --------------------------------------------------------------------------- +# 2. Path-traversal hardening tests (CliTable._ValidatePath) +# --------------------------------------------------------------------------- + +class TestValidatePath(unittest.TestCase): + """Tests for CliTable._ValidatePath.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + # Minimal CliTable without an index so we can call _ValidatePath directly. + clitable.CliTable.INDEX = {} + self.ct = clitable.CliTable(template_dir=self.tmpdir) + + def test_path_within_root_is_allowed(self): + safe = os.path.join(self.tmpdir, 'template.fsm') + result = self.ct._ValidatePath(safe) + self.assertEqual(result, os.path.normpath(os.path.abspath(safe))) + + def test_traversal_with_dotdot_raises(self): + evil = os.path.join(self.tmpdir, '..', 'etc', 'passwd') + with self.assertRaises(clitable.CliTableError): + self.ct._ValidatePath(evil) + + def test_absolute_path_outside_root_raises(self): + with self.assertRaises(clitable.CliTableError): + self.ct._ValidatePath('/etc/shadow') + + def test_no_template_dir_allows_any_path(self): + """With template_dir=None the check is skipped (backwards-compatible).""" + clitable.CliTable.INDEX = {} + ct = clitable.CliTable(template_dir=None) + # Should not raise even for a path outside any sandbox. + result = ct._ValidatePath('/tmp/something.fsm') + self.assertEqual(result, '/tmp/something.fsm') + + def test_traversal_in_template_names_to_files(self): + """_TemplateNamesToFiles must reject traversal in template names.""" + # Write a real index so the object is fully constructed. + idx_path = os.path.join(self.tmpdir, 'index') + with open(idx_path, 'w') as fh: + fh.write('Template, Command\n') + fh.write('tmpl.fsm, show version\n') + clitable.CliTable.INDEX = {} + ct = clitable.CliTable('index', self.tmpdir) + with self.assertRaises(clitable.CliTableError): + ct._TemplateNamesToFiles('../../etc/passwd') + + def test_read_index_traversal_raises(self): + """ReadIndex must reject an index_file that escapes template_dir.""" + clitable.CliTable.INDEX = {} + ct = clitable.CliTable(template_dir=self.tmpdir) + with self.assertRaises(clitable.CliTableError): + ct.ReadIndex('../../etc/passwd') + + +class TestMtimeCache(unittest.TestCase): + """Tests that CliTable.INDEX is keyed by mtime.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + # Create a valid index file. + self.idx_path = os.path.join(self.tmpdir, 'index') + with open(self.idx_path, 'w') as fh: + fh.write('Template, Command\n') + fh.write('tmpl.fsm, show version\n') + + def test_cache_hit_same_mtime(self): + clitable.CliTable.INDEX = {} + ct1 = clitable.CliTable('index', self.tmpdir) + ct2 = clitable.CliTable('index', self.tmpdir) + self.assertIs(ct1.index, ct2.index) + # Only one entry in the global cache. + self.assertEqual(len(clitable.CliTable.INDEX), 1) + + def test_cache_miss_after_mtime_change(self): + clitable.CliTable.INDEX = {} + clitable.CliTable('index', self.tmpdir) + # Simulate file being updated. + new_mtime = os.path.getmtime(self.idx_path) + 2 + os.utime(self.idx_path, (new_mtime, new_mtime)) + # Re-read: must detect the new mtime and reparse. + clitable.CliTable.INDEX = {} # clear to simulate fresh process restart + ct2 = clitable.CliTable('index', self.tmpdir) + # Just verify we can read it without error; index is a fresh object. + self.assertIsNotNone(ct2.index) + + +# --------------------------------------------------------------------------- +# 3. ReDoS mitigation: max_input_len in ParseText / ParseTextToDicts +# --------------------------------------------------------------------------- + +_FSM_TEMPLATE = ( + 'Value Hostname (\\S+)\n' + '\n' + 'Start\n' + ' ^hostname ${Hostname}\n' + '\n' +) + + +class TestMaxInputLen(unittest.TestCase): + """Tests for the max_input_len parameter.""" + + def _make_fsm(self): + return textfsm.TextFSM(io.StringIO(_FSM_TEMPLATE)) + + # --- ParseText --- + + def test_parse_text_within_limit_succeeds(self): + fsm = self._make_fsm() + short_input = 'hostname router1\n' + result = fsm.ParseText(short_input, max_input_len=1000) + self.assertEqual(result, [['router1']]) + + def test_parse_text_at_exact_limit_succeeds(self): + fsm = self._make_fsm() + text = 'hostname router1\n' + result = fsm.ParseText(text, max_input_len=len(text)) + self.assertEqual(result, [['router1']]) + + def test_parse_text_over_limit_raises(self): + fsm = self._make_fsm() + long_input = 'hostname router1\n' * 100 + with self.assertRaises(textfsm.TextFSMError) as ctx: + fsm.ParseText(long_input, max_input_len=10) + self.assertIn('exceeds', str(ctx.exception).lower()) + + def test_parse_text_no_limit_default(self): + """Default (None) must never raise regardless of input size.""" + fsm = self._make_fsm() + large_input = 'hostname router1\n' * 10_000 + # The template has no Record action so implicit EOF appends a single row. + # The key assertion is that no TextFSMError is raised and at least one + # row is returned. + result = fsm.ParseText(large_input) + self.assertGreaterEqual(len(result), 1) + + def test_parse_text_empty_input_respects_limit(self): + """Empty string is shorter than any positive limit; must not raise.""" + fsm = self._make_fsm() + result = fsm.ParseText('', max_input_len=0) + self.assertEqual(result, []) + + # --- ParseTextToDicts --- + + def test_parse_text_to_dicts_within_limit(self): + fsm = self._make_fsm() + text = 'hostname router1\n' + result = fsm.ParseTextToDicts(text, max_input_len=1000) + self.assertEqual(result, [{'Hostname': 'router1'}]) + + def test_parse_text_to_dicts_over_limit_raises(self): + fsm = self._make_fsm() + long_input = 'hostname router1\n' * 200 + with self.assertRaises(textfsm.TextFSMError): + fsm.ParseTextToDicts(long_input, max_input_len=5) + + # --- exact boundary behaviour --- + + def test_one_over_limit_raises(self): + fsm = self._make_fsm() + text = 'hostname r\n' # 11 chars + with self.assertRaises(textfsm.TextFSMError): + fsm.ParseText(text, max_input_len=10) + + def test_one_under_limit_ok(self): + fsm = self._make_fsm() + text = 'hostname r\n' # 11 chars + result = fsm.ParseText(text, max_input_len=12) + self.assertEqual(result, [['r']]) + + +if __name__ == '__main__': + unittest.main()