From fb4afe6f3c9225b3115fab3b865c434dc8c80f25 Mon Sep 17 00:00:00 2001 From: ahmed Date: Fri, 3 Oct 2025 01:39:45 -0400 Subject: [PATCH 1/6] add support for multiple lines yaml values and tests --- pyworker/job.py | 3 +- pyworker/util.py | 47 ++++++++++++++++++++++++++ tests/fixtures/handler_registered.yaml | 2 ++ tests/test_job.py | 3 +- 4 files changed, 53 insertions(+), 2 deletions(-) diff --git a/pyworker/job.py b/pyworker/job.py index df06ab1..38087ba 100644 --- a/pyworker/job.py +++ b/pyworker/job.py @@ -1,6 +1,6 @@ import re import yaml -from pyworker.util import get_current_time, get_time_delta +from pyworker.util import get_current_time, get_time_delta, squash_multiline_yaml _job_class_registry = {} @@ -76,6 +76,7 @@ def extract_extra_fields(extra_fields, extra_field_values): job_id, attempts, run_at, queue, handler, *extra_field_values = job_row extra_fields_dict = extract_extra_fields(extra_fields, extra_field_values) handler = handler.splitlines() + handler = squash_multiline_yaml(handler) class_name = extract_class_name(handler[1]) logger.debug("Found Job %d with class name: %s" % (job_id, class_name)) diff --git a/pyworker/util.py b/pyworker/util.py index 625910b..caba81b 100644 --- a/pyworker/util.py +++ b/pyworker/util.py @@ -8,3 +8,50 @@ def get_current_time(): def get_time_delta(**kwargs): return dateutil.relativedelta.relativedelta(**kwargs) + +def squash_multiline_yaml(lines): + """ + Given a list of YAML lines, squash lines with unclosed quotes (single or double) + into a single line until the quote closes. + Handles values that contain ':' safely. + """ + squashed = [] + buffer = None + quote_char = None + + for line in lines: + stripped = line.strip() + + if buffer is None: + if ":" in line: + # split only once (key : value) + key, val = line.split(":", 1) + val = val.lstrip() + + # quoted start but not closed + if val.startswith("'") and not (len(val) > 1 and val.endswith("'")): + buffer = line.rstrip("\n") + quote_char = "'" + continue + elif val.startswith('"') and not (len(val) > 1 and val.endswith('"')): + buffer = line.rstrip("\n") + quote_char = '"' + continue + + squashed.append(line) + + else: + # still accumulating + buffer += "\\n" + line.strip("\n") + + # closing quote? + if line.strip().endswith(quote_char): + squashed.append(buffer) + buffer = None + quote_char = None + + # in case YAML was malformed and never closed + if buffer is not None: + squashed.append(buffer) + + return squashed diff --git a/tests/fixtures/handler_registered.yaml b/tests/fixtures/handler_registered.yaml index 6ae53f6..566794a 100644 --- a/tests/fixtures/handler_registered.yaml +++ b/tests/fixtures/handler_registered.yaml @@ -8,6 +8,8 @@ object: !ruby/object:RegisteredJob multiline total_articles: 1000 is_blind: true + extra_new_line: 'line one + ' attributes: !ruby/object:ActiveRecord::AttributeSet attributes: !ruby/object:ActiveRecord::LazyAttributeHash types: {} diff --git a/tests/test_job.py b/tests/test_job.py index 3d482f6..21c8abf 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -145,7 +145,8 @@ def test_from_row_when_registered_class_returns_job_instance_with_attributes(sel 'title': 'review title', 'description': 'review description\nmultiline\n', 'total_articles': 1000, - 'is_blind': True + 'is_blind': True, + 'extra_new_line': 'line one\\n ', }) self.assertIsNone(job.reporter) From 7f256e5af35d171d9e8d48637908993078132530 Mon Sep 17 00:00:00 2001 From: ahmed Date: Fri, 3 Oct 2025 14:25:40 -0400 Subject: [PATCH 2/6] ignore unknown tags --- pyworker/job.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pyworker/job.py b/pyworker/job.py index 38087ba..04c2136 100644 --- a/pyworker/job.py +++ b/pyworker/job.py @@ -15,6 +15,12 @@ def __new__(meta, name, bases, class_dict): return cls +class IgnoreUnknownTagsLoader(yaml.SafeLoader): + def ignore_unknown(self, node): + return None + + + class Job(object, metaclass=Meta): """docstring for Job""" def __init__(self, class_name, database, logger, @@ -95,7 +101,7 @@ def extract_extra_fields(extra_fields, extra_field_values): logger.debug("Found attributes: %s" % str(attributes)) stripped = '\n'.join(['object:', ' attributes:'] + attributes) - payload = yaml.load(stripped, Loader=yaml.FullLoader) + payload = yaml.load(stripped, Loader=IgnoreUnknownTagsLoader) logger.debug("payload object: %s" % str(payload)) return target_class(class_name=class_name, logger=logger, From fba5c9d7235eb52a117cdbdccc5d9162d4a5f766 Mon Sep 17 00:00:00 2001 From: ahmed Date: Fri, 3 Oct 2025 18:15:34 -0400 Subject: [PATCH 3/6] apply suggestion from hossam --- pyworker/job.py | 27 ++++----------- pyworker/util.py | 47 -------------------------- tests/fixtures/handler_registered.yaml | 2 +- tests/test_job.py | 2 +- 4 files changed, 9 insertions(+), 69 deletions(-) diff --git a/pyworker/job.py b/pyworker/job.py index 04c2136..2ee47a1 100644 --- a/pyworker/job.py +++ b/pyworker/job.py @@ -1,6 +1,6 @@ import re import yaml -from pyworker.util import get_current_time, get_time_delta, squash_multiline_yaml +from pyworker.util import get_current_time, get_time_delta _job_class_registry = {} @@ -19,7 +19,8 @@ class IgnoreUnknownTagsLoader(yaml.SafeLoader): def ignore_unknown(self, node): return None - +def no_ruby_objects(loader, tag_suffix, node): + return loader.construct_mapping(node) class Job(object, metaclass=Meta): """docstring for Job""" @@ -60,19 +61,6 @@ def extract_class_name(line): else: return None - def extract_attributes(lines): - attributes = [] - collect = False - for line in lines: - if line.startswith(' raw_attributes:'): - collect = True - elif not line.startswith(' '): - if collect: - break - elif collect: - attributes.append(line) - return attributes - def extract_extra_fields(extra_fields, extra_field_values): if extra_fields is None or extra_field_values is None: return None @@ -82,7 +70,6 @@ def extract_extra_fields(extra_fields, extra_field_values): job_id, attempts, run_at, queue, handler, *extra_field_values = job_row extra_fields_dict = extract_extra_fields(extra_fields, extra_field_values) handler = handler.splitlines() - handler = squash_multiline_yaml(handler) class_name = extract_class_name(handler[1]) logger.debug("Found Job %d with class name: %s" % (job_id, class_name)) @@ -96,11 +83,11 @@ def extract_extra_fields(extra_fields, extra_field_values): abstract=True, extra_fields=extra_fields_dict, reporter=reporter, max_backoff_delay_seconds=max_backoff_delay_seconds ) - - attributes = extract_attributes(handler[2:]) + attributes = handler[3:] logger.debug("Found attributes: %s" % str(attributes)) - stripped = '\n'.join(['object:', ' attributes:'] + attributes) + stripped = '\n'.join(['object:', ' raw_attributes:'] + attributes) + yaml.SafeLoader.add_multi_constructor("!ruby/object:", no_ruby_objects) payload = yaml.load(stripped, Loader=IgnoreUnknownTagsLoader) logger.debug("payload object: %s" % str(payload)) @@ -108,7 +95,7 @@ def extract_extra_fields(extra_fields, extra_field_values): job_id=job_id, attempts=attempts, run_at=run_at, queue=queue, database=database, max_attempts=max_attempts, - attributes=payload['object']['attributes'], + attributes=payload['object']['raw_attributes'], abstract=False, extra_fields=extra_fields_dict, reporter=reporter, max_backoff_delay_seconds=max_backoff_delay_seconds ) diff --git a/pyworker/util.py b/pyworker/util.py index caba81b..625910b 100644 --- a/pyworker/util.py +++ b/pyworker/util.py @@ -8,50 +8,3 @@ def get_current_time(): def get_time_delta(**kwargs): return dateutil.relativedelta.relativedelta(**kwargs) - -def squash_multiline_yaml(lines): - """ - Given a list of YAML lines, squash lines with unclosed quotes (single or double) - into a single line until the quote closes. - Handles values that contain ':' safely. - """ - squashed = [] - buffer = None - quote_char = None - - for line in lines: - stripped = line.strip() - - if buffer is None: - if ":" in line: - # split only once (key : value) - key, val = line.split(":", 1) - val = val.lstrip() - - # quoted start but not closed - if val.startswith("'") and not (len(val) > 1 and val.endswith("'")): - buffer = line.rstrip("\n") - quote_char = "'" - continue - elif val.startswith('"') and not (len(val) > 1 and val.endswith('"')): - buffer = line.rstrip("\n") - quote_char = '"' - continue - - squashed.append(line) - - else: - # still accumulating - buffer += "\\n" + line.strip("\n") - - # closing quote? - if line.strip().endswith(quote_char): - squashed.append(buffer) - buffer = None - quote_char = None - - # in case YAML was malformed and never closed - if buffer is not None: - squashed.append(buffer) - - return squashed diff --git a/tests/fixtures/handler_registered.yaml b/tests/fixtures/handler_registered.yaml index 566794a..7f5b591 100644 --- a/tests/fixtures/handler_registered.yaml +++ b/tests/fixtures/handler_registered.yaml @@ -9,7 +9,7 @@ object: !ruby/object:RegisteredJob total_articles: 1000 is_blind: true extra_new_line: 'line one - ' + abc' attributes: !ruby/object:ActiveRecord::AttributeSet attributes: !ruby/object:ActiveRecord::LazyAttributeHash types: {} diff --git a/tests/test_job.py b/tests/test_job.py index 21c8abf..2062b5b 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -146,7 +146,7 @@ def test_from_row_when_registered_class_returns_job_instance_with_attributes(sel 'description': 'review description\nmultiline\n', 'total_articles': 1000, 'is_blind': True, - 'extra_new_line': 'line one\\n ', + 'extra_new_line': 'line one abc', }) self.assertIsNone(job.reporter) From 02b027a20bc915fd3ef4ac95e99c51b27c5369bb Mon Sep 17 00:00:00 2001 From: ahmed Date: Fri, 3 Oct 2025 19:59:36 -0400 Subject: [PATCH 4/6] register to ignoreUnknownTagsLoader instead of global --- pyworker/job.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pyworker/job.py b/pyworker/job.py index 2ee47a1..fec19e2 100644 --- a/pyworker/job.py +++ b/pyworker/job.py @@ -16,12 +16,14 @@ def __new__(meta, name, bases, class_dict): class IgnoreUnknownTagsLoader(yaml.SafeLoader): - def ignore_unknown(self, node): - return None + """Custom YAML loader that ignores unknown Ruby object tags.""" def no_ruby_objects(loader, tag_suffix, node): + # Construct mapping normally, ignoring Ruby-specific tags return loader.construct_mapping(node) + + class Job(object, metaclass=Meta): """docstring for Job""" def __init__(self, class_name, database, logger, @@ -87,7 +89,8 @@ def extract_extra_fields(extra_fields, extra_field_values): logger.debug("Found attributes: %s" % str(attributes)) stripped = '\n'.join(['object:', ' raw_attributes:'] + attributes) - yaml.SafeLoader.add_multi_constructor("!ruby/object:", no_ruby_objects) + + IgnoreUnknownTagsLoader.add_multi_constructor("!ruby/object:", no_ruby_objects) payload = yaml.load(stripped, Loader=IgnoreUnknownTagsLoader) logger.debug("payload object: %s" % str(payload)) From 10b2fd6880493a3f855d95771da6febabebf7765 Mon Sep 17 00:00:00 2001 From: ahmed Date: Fri, 3 Oct 2025 20:02:03 -0400 Subject: [PATCH 5/6] cleanup --- pyworker/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyworker/job.py b/pyworker/job.py index fec19e2..5c50fc1 100644 --- a/pyworker/job.py +++ b/pyworker/job.py @@ -18,12 +18,12 @@ def __new__(meta, name, bases, class_dict): class IgnoreUnknownTagsLoader(yaml.SafeLoader): """Custom YAML loader that ignores unknown Ruby object tags.""" + def no_ruby_objects(loader, tag_suffix, node): # Construct mapping normally, ignoring Ruby-specific tags return loader.construct_mapping(node) - class Job(object, metaclass=Meta): """docstring for Job""" def __init__(self, class_name, database, logger, From 07fcac92725d95d2e72f42cf51c609ed098b0bb2 Mon Sep 17 00:00:00 2001 From: Hossam Hammady Date: Mon, 6 Oct 2025 16:29:39 -0400 Subject: [PATCH 6/6] Remove redundant yaml loader & add more tests --- pyworker/job.py | 12 +++++------- tests/fixtures/handler_registered.yaml | 10 +++++++++- tests/test_job.py | 4 +++- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/pyworker/job.py b/pyworker/job.py index 5c50fc1..e0b30bf 100644 --- a/pyworker/job.py +++ b/pyworker/job.py @@ -2,6 +2,7 @@ import yaml from pyworker.util import get_current_time, get_time_delta + _job_class_registry = {} def _register_class(target_class): @@ -15,14 +16,13 @@ def __new__(meta, name, bases, class_dict): return cls -class IgnoreUnknownTagsLoader(yaml.SafeLoader): - """Custom YAML loader that ignores unknown Ruby object tags.""" - - +# Add a YAML constructor to ignore Ruby-specific tags (required once at module load time) def no_ruby_objects(loader, tag_suffix, node): # Construct mapping normally, ignoring Ruby-specific tags return loader.construct_mapping(node) +yaml.SafeLoader.add_multi_constructor("!ruby/object:", no_ruby_objects) + class Job(object, metaclass=Meta): """docstring for Job""" @@ -89,9 +89,7 @@ def extract_extra_fields(extra_fields, extra_field_values): logger.debug("Found attributes: %s" % str(attributes)) stripped = '\n'.join(['object:', ' raw_attributes:'] + attributes) - - IgnoreUnknownTagsLoader.add_multi_constructor("!ruby/object:", no_ruby_objects) - payload = yaml.load(stripped, Loader=IgnoreUnknownTagsLoader) + payload = yaml.load(stripped, Loader=yaml.SafeLoader) logger.debug("payload object: %s" % str(payload)) return target_class(class_name=class_name, logger=logger, diff --git a/tests/fixtures/handler_registered.yaml b/tests/fixtures/handler_registered.yaml index 7f5b591..a4aa7f5 100644 --- a/tests/fixtures/handler_registered.yaml +++ b/tests/fixtures/handler_registered.yaml @@ -8,8 +8,16 @@ object: !ruby/object:RegisteredJob multiline total_articles: 1000 is_blind: true - extra_new_line: 'line one + proper_multiline: |- + line one + line two + + line four + collapsed_first_newline: 'line one abc' + extra_new_line: 'line one + + line two' attributes: !ruby/object:ActiveRecord::AttributeSet attributes: !ruby/object:ActiveRecord::LazyAttributeHash types: {} diff --git a/tests/test_job.py b/tests/test_job.py index 2062b5b..c1e06c1 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -146,7 +146,9 @@ def test_from_row_when_registered_class_returns_job_instance_with_attributes(sel 'description': 'review description\nmultiline\n', 'total_articles': 1000, 'is_blind': True, - 'extra_new_line': 'line one abc', + 'collapsed_first_newline': 'line one abc', + 'extra_new_line': 'line one\nline two', + 'proper_multiline': 'line one\nline two\n\nline four' }) self.assertIsNone(job.reporter)