diff --git a/tests.py b/tests.py index dfebcb0..7ad1829 100644 --- a/tests.py +++ b/tests.py @@ -18,182 +18,152 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. +import doctest +import gc +import io +import os +import tempfile import treelog import unittest -import contextlib -import tempfile -import os -import sys -import hashlib -import io import warnings -import gc -import doctest from treelog import _path, _state from treelog.proto import Level, Data -class Log(unittest.TestCase): +@treelog.withcontext +def generate_test(): + 'decorated function for unit testing' - maxDiff = None + with treelog.warningfile('test.dat', 'wb') as f: + f.write(b'test3') - @contextlib.contextmanager - def assertSilent(self): - with capture() as captured: - yield - self.assertEqual(captured.stdout, '') - @treelog.withcontext - def generate_test(self): - with treelog.warningfile('test.dat', 'wb') as f: - f.write(b'test3') +def generate(): + 'generate log events for unit testing' - def generate(self): - treelog.user('my message') - with treelog.infofile('test.dat', 'w') as f: - f.write('test1') - with treelog.context('my context'): - with treelog.iter.plain('iter', 'abc') as items: - for c in items: - treelog.info(c) - with treelog.context('empty'): - pass - treelog.error('multiple..\n ..lines') - with treelog.userfile('test.dat', 'wb') as f: - treelog.info('generating') - f.write(b'test2') - self.generate_test() - with treelog.context('context step={}', 0) as format: - treelog.info('foo') - format(1) - treelog.info('bar') - treelog.errordata('same.dat', b'test3') - with treelog.debugfile('dbg.jpg', 'wb', type='image/jpg') as f: - f.write(b'test4') - treelog.debug('dbg') - treelog.warning('warn') + treelog.user('my message') + with treelog.infofile('test.dat', 'w') as f: + f.write('test1') + with treelog.context('my context'): + with treelog.iter.plain('iter', 'abc') as items: + for c in items: + treelog.info(c) + with treelog.context('empty'): + pass + treelog.error('multiple..\n ..lines') + with treelog.userfile('test.dat', 'wb') as f: + treelog.info('generating') + f.write(b'test2') + generate_test() + with treelog.context('context step={}', 0) as format: + treelog.info('foo') + format(1) + treelog.info('bar') + treelog.errordata('same.dat', b'test3') + with treelog.debugfile('dbg.jpg', 'wb', type='image/jpg') as f: + f.write(b'test4') + treelog.debug('dbg') + treelog.warning('warn') + + +class StdoutLog(unittest.TestCase): + + def test_output(self): + f = io.StringIO() + with treelog.set(treelog.StdoutLog(f)): + generate() + self.check_output(f) + + def check_output(self, f): + self.assertEqual(f.getvalue(), + 'my message\n' + 'test.dat [5 bytes]\n' + 'my context > iter 1 > a\n' + 'my context > iter 2 > b\n' + 'my context > iter 3 > c\n' + 'my context > multiple..\n' + ' > ..lines\n' + 'my context > test.dat > generating\n' + 'my context > test.dat [5 bytes]\n' + 'generate_test > test.dat [5 bytes]\n' + 'context step=0 > foo\n' + 'context step=1 > bar\n' + 'same.dat [5 bytes]\n' + 'dbg.jpg [image/jpg; 5 bytes]\n' + 'dbg\n' + 'warn\n') + + +class RichOutputLog(unittest.TestCase): + + def test_output(self): + f = io.StringIO() + with treelog.set(treelog.RichOutputLog(f)): + generate() + self.check_output(f) + + def check_output(self, f): + self.assertEqual(f.getvalue(), + '\x1b[1;34mmy message\x1b[0m\n' + 'test.dat > ' + '\r\x1b[K' + '\x1b[1mtest.dat [5 bytes]\x1b[0m\n' + 'my context > ' + 'iter 0 ' + '> \x1b[4D1 > ' + '\x1b[1ma\x1b[0m\nmy context > iter 1 > ' + '\x1b[4D2 > ' + '\x1b[1mb\x1b[0m\nmy context > iter 2 > ' + '\x1b[4D3 > ' + '\x1b[1mc\x1b[0m\nmy context > iter 3 > ' + '\x1b[9D\x1b[K' + 'empty > ' + '\x1b[8D\x1b[K' + '\x1b[1;31mmultiple..\x1b[0m\n > \x1b[1;31m ..lines\x1b[0m\nmy context > test.dat > ' + '\x1b[1mgenerating\x1b[0m\nmy context > test.dat > ' + '\x1b[11D\x1b[K' + '\x1b[1;34mtest.dat [5 bytes]\x1b[0m\nmy context > ' + '\r\x1b[Kgenerate_test > test.dat > ' + '\x1b[11D\x1b[K' + '\x1b[1;35mtest.dat [5 bytes]\x1b[0m\ngenerate_test > ' + '\r\x1b[K' + 'context step=0 > ' + '\x1b[1mfoo\x1b[0m\n' + 'context step=0 > ' + '\x1b[4D1 > ' + '\x1b[1mbar\x1b[0m\n' + 'context step=1 > ' + '\r\x1b[K' + '\x1b[1;31msame.dat [5 bytes]\x1b[0m\n' + 'dbg.jpg > ' + '\r\x1b[K' + '\x1b[1;30mdbg.jpg [image/jpg; 5 bytes]\x1b[0m\n' + '\x1b[1;30mdbg\x1b[0m\n' + '\x1b[1;35mwarn\x1b[0m\n') + + +class DataLog(unittest.TestCase): def test_output(self): - with self.assertSilent(), self.output_tester() as log, treelog.set(log): - self.generate() - - -class StdoutLog(Log): - - @contextlib.contextmanager - def output_tester(self): - with capture() as captured: - yield treelog.StdoutLog() - self.assertEqual(captured.stdout, - 'my message\n' - 'test.dat [5 bytes]\n' - 'my context > iter 1 > a\n' - 'my context > iter 2 > b\n' - 'my context > iter 3 > c\n' - 'my context > multiple..\n' - ' ..lines\n' - 'my context > test.dat > generating\n' - 'my context > test.dat [5 bytes]\n' - 'generate_test > test.dat [5 bytes]\n' - 'context step=0 > foo\n' - 'context step=1 > bar\n' - 'same.dat [5 bytes]\n' - 'dbg.jpg [image/jpg; 5 bytes]\n' - 'dbg\n' - 'warn\n') - self.assertEqual(captured.stderr, '') - - -class StderrLog(Log): - - @contextlib.contextmanager - def output_tester(self): - with capture() as captured: - yield treelog.StderrLog() - self.assertEqual(captured.stderr, - 'my message\n' - 'test.dat [5 bytes]\n' - 'my context > iter 1 > a\n' - 'my context > iter 2 > b\n' - 'my context > iter 3 > c\n' - 'my context > multiple..\n' - ' ..lines\n' - 'my context > test.dat > generating\n' - 'my context > test.dat [5 bytes]\n' - 'generate_test > test.dat [5 bytes]\n' - 'context step=0 > foo\n' - 'context step=1 > bar\n' - 'same.dat [5 bytes]\n' - 'dbg.jpg [image/jpg; 5 bytes]\n' - 'dbg\n' - 'warn\n') - self.assertEqual(captured.stdout, '') - - -class RichOutputLog(Log): - - @contextlib.contextmanager - def output_tester(self): - with capture() as captured: - yield treelog.RichOutputLog() - self.assertEqual(captured.stdout, - '\x1b[1;34mmy message\x1b[0m\n' - 'test.dat > ' - '\r\x1b[K' - '\x1b[1mtest.dat [5 bytes]\x1b[0m\n' - 'my context > ' - 'iter 0 ' - '> \x1b[4D1 > ' - '\x1b[1ma\x1b[0m\nmy context > iter 1 > ' - '\x1b[4D2 > ' - '\x1b[1mb\x1b[0m\nmy context > iter 2 > ' - '\x1b[4D3 > ' - '\x1b[1mc\x1b[0m\nmy context > iter 3 > ' - '\x1b[9D\x1b[K' - 'empty > ' - '\x1b[8D\x1b[K' - '\x1b[1;31mmultiple..\n ..lines\x1b[0m\nmy context > test.dat > ' - '\x1b[1mgenerating\x1b[0m\nmy context > test.dat > ' - '\x1b[11D\x1b[K' - '\x1b[1;34mtest.dat [5 bytes]\x1b[0m\nmy context > ' - '\r\x1b[Kgenerate_test > test.dat > ' - '\x1b[11D\x1b[K' - '\x1b[1;35mtest.dat [5 bytes]\x1b[0m\ngenerate_test > ' - '\r\x1b[K' - 'context step=0 > ' - '\x1b[1mfoo\x1b[0m\n' - 'context step=0 > ' - '\x1b[4D1 > ' - '\x1b[1mbar\x1b[0m\n' - 'context step=1 > ' - '\r\x1b[K' - '\x1b[1;31msame.dat [5 bytes]\x1b[0m\n' - 'dbg.jpg > ' - '\r\x1b[K' - '\x1b[1;30mdbg.jpg [image/jpg; 5 bytes]\x1b[0m\n' - '\x1b[1;30mdbg\x1b[0m\n' - '\x1b[1;35mwarn\x1b[0m\n') - - -class DataLog(Log): - - @contextlib.contextmanager - def output_tester(self): with tempfile.TemporaryDirectory() as tmpdir: - yield treelog.DataLog(tmpdir) - self.assertEqual(set(os.listdir(tmpdir)), { - 'test.dat', 'test-1.dat', 'test-2.dat', 'same.dat', 'dbg.jpg'}) - with open(os.path.join(tmpdir, 'test.dat'), 'r') as f: - self.assertEqual(f.read(), 'test1') - with open(os.path.join(tmpdir, 'test-1.dat'), 'rb') as f: - self.assertEqual(f.read(), b'test2') - with open(os.path.join(tmpdir, 'test-2.dat'), 'rb') as f: - self.assertEqual(f.read(), b'test3') - with open(os.path.join(tmpdir, 'same.dat'), 'rb') as f: - self.assertEqual(f.read(), b'test3') - with open(os.path.join(tmpdir, 'dbg.jpg'), 'r') as f: - self.assertEqual(f.read(), 'test4') + with treelog.set(treelog.DataLog(tmpdir)): + generate() + self.check_output(tmpdir) + + def check_output(self, tmpdir): + self.assertEqual(set(os.listdir(tmpdir)), { + 'test.dat', 'test-1.dat', 'test-2.dat', 'same.dat', 'dbg.jpg'}) + with open(os.path.join(tmpdir, 'test.dat'), 'r') as f: + self.assertEqual(f.read(), 'test1') + with open(os.path.join(tmpdir, 'test-1.dat'), 'rb') as f: + self.assertEqual(f.read(), b'test2') + with open(os.path.join(tmpdir, 'test-2.dat'), 'rb') as f: + self.assertEqual(f.read(), b'test3') + with open(os.path.join(tmpdir, 'same.dat'), 'rb') as f: + self.assertEqual(f.read(), b'test3') + with open(os.path.join(tmpdir, 'dbg.jpg'), 'r') as f: + self.assertEqual(f.read(), 'test4') @unittest.skipIf(not _path.supports_fd, 'dir_fd not supported on platform') def test_move_outdir(self): @@ -208,69 +178,71 @@ def test_move_outdir(self): self.assertEqual(os.listdir(outdira), []) -class HtmlLog(Log): +class HtmlLog(unittest.TestCase): - @contextlib.contextmanager - def output_tester(self): + def test_output(self): with tempfile.TemporaryDirectory() as tmpdir: - tests = ['b444ac06613fc8d63795be9ad0beaf55011936ac.dat', '109f4b3c50d7b0df729d299bc6f8e9ef9066971f.dat', - '3ebfa301dc59196f18593c45e519287a23297589.dat', '1ff2b3704aede04eecb51e50ca698efd50a1379b.jpg'] - with self.assertSilent(), treelog.HtmlLog(tmpdir, title='test') as htmllog: - yield htmllog - self.assertEqual(htmllog.filename, 'log.html') - self.assertGreater(set(os.listdir(tmpdir)), {'log.html', *tests}) - with open(os.path.join(tmpdir, 'log.html'), 'r') as f: - lines = f.readlines() - self.assertIn('\n', lines) - self.assertEqual(lines[lines.index('\n'):], [ - '\n', - '\n', - '
\n', - '
my message
\n', - '
test.dat
\n', - '
my context
\n', - '
iter 1
\n', - '
a
\n', - '
\n', - '
iter 2
\n', - '
b
\n', - '
\n', - '
iter 3
\n', - '
c
\n', - '
\n', - '
multiple..\n', - ' ..lines
\n', - '
test.dat
\n', - '
generating
\n', - '
\n', - '\n', - '
\n', - '
generate_test
\n', - '\n', - '
\n', - '
context step=0
\n', - '
foo
\n', - '
\n', - '
context step=1
\n', - '
bar
\n', - '
\n', - '
same.dat
\n', - '
dbg.jpg
\n', - '
dbg
\n', - '
warn
\n', - '
\n']) - for i, test in enumerate(tests, 1): - with open(os.path.join(tmpdir, test), 'rb') as f: - self.assertEqual(f.read(), b'test%i' % i) + with treelog.HtmlLog(tmpdir, title='test') as htmllog, treelog.set(htmllog): + generate() + self.check_output(tmpdir, htmllog.filename) + + def check_output(self, tmpdir, filename): + tests = ['b444ac06613fc8d63795be9ad0beaf55011936ac.dat', '109f4b3c50d7b0df729d299bc6f8e9ef9066971f.dat', + '3ebfa301dc59196f18593c45e519287a23297589.dat', '1ff2b3704aede04eecb51e50ca698efd50a1379b.jpg'] + self.assertEqual(filename, 'log.html') + self.assertGreater(set(os.listdir(tmpdir)), {'log.html', *tests}) + with open(os.path.join(tmpdir, 'log.html'), 'r') as f: + lines = f.readlines() + self.assertIn('\n', lines) + self.assertEqual(lines[lines.index('\n'):], [ + '\n', + '\n', + '
\n', + '
my message
\n', + '
test.dat
\n', + '
my context
\n', + '
iter 1
\n', + '
a
\n', + '
\n', + '
iter 2
\n', + '
b
\n', + '
\n', + '
iter 3
\n', + '
c
\n', + '
\n', + '
multiple..\n', + ' ..lines
\n', + '
test.dat
\n', + '
generating
\n', + '
\n', + '\n', + '
\n', + '
generate_test
\n', + '\n', + '
\n', + '
context step=0
\n', + '
foo
\n', + '
\n', + '
context step=1
\n', + '
bar
\n', + '
\n', + '
same.dat
\n', + '
dbg.jpg
\n', + '
dbg
\n', + '
warn
\n', + '
\n']) + for i, test in enumerate(tests, 1): + with open(os.path.join(tmpdir, test), 'rb') as f: + self.assertEqual(f.read(), b'test%i' % i) @unittest.skipIf(not _path.supports_fd, 'dir_fd not supported on platform') def test_move_outdir(self): with tempfile.TemporaryDirectory() as tmpdir: outdira = os.path.join(tmpdir, 'a') outdirb = os.path.join(tmpdir, 'b') - with silent(), treelog.HtmlLog(outdira) as log: + with treelog.HtmlLog(outdira) as log: os.rename(outdira, outdirb) os.mkdir(outdira) log.write(Data('dat', b''), Level.info) @@ -279,24 +251,45 @@ def test_move_outdir(self): def test_filename_sequence(self): with tempfile.TemporaryDirectory() as tmpdir: - with silent(), treelog.HtmlLog(tmpdir) as log: + with treelog.HtmlLog(tmpdir) as log: pass self.assertTrue(os.path.exists(os.path.join(tmpdir, 'log.html'))) - with silent(), treelog.HtmlLog(tmpdir) as log: + with treelog.HtmlLog(tmpdir) as log: pass self.assertTrue(os.path.exists(os.path.join(tmpdir, 'log-1.html'))) - with silent(), treelog.HtmlLog(tmpdir) as log: + with treelog.HtmlLog(tmpdir) as log: pass self.assertTrue(os.path.exists(os.path.join(tmpdir, 'log-2.html'))) -class RecordLog(Log): +class RecordLog(unittest.TestCase): - @contextlib.contextmanager - def output_tester(self): - recordlog = treelog.RecordLog(simplify=False) - yield recordlog - self.assertEqual(recordlog._messages, [ + simplify = False + + def test_output(self): + recordlog = treelog.RecordLog(simplify=self.simplify) + with treelog.set(recordlog): + generate() + self.check_output(recordlog._messages) + with self.subTest('replay to StdoutLog'): + f = io.StringIO() + recordlog.replay(treelog.StdoutLog(f)) + StdoutLog.check_output(self, f) + with self.subTest('replay to DataLog'), tempfile.TemporaryDirectory() as tmpdir: + recordlog.replay(treelog.DataLog(tmpdir)) + DataLog.check_output(self, tmpdir) + with self.subTest('replay to HtmlLog'), tempfile.TemporaryDirectory() as tmpdir: + with treelog.HtmlLog(tmpdir, title='test') as htmllog: + recordlog.replay(htmllog) + HtmlLog.check_output(self, tmpdir, htmllog.filename) + if not self.simplify: + with self.subTest('replay to RichOutputLog'): + f = io.StringIO() + recordlog.replay(treelog.RichOutputLog(f)) + RichOutputLog.check_output(self, f) + + def check_output(self, messages): + self.assertEqual(messages, [ ('write', 'my message', Level.user), ('pushcontext', 'test.dat'), ('popcontext',), @@ -334,24 +327,20 @@ def output_tester(self): ('write', Data('dbg.jpg', b'test4', type='image/jpg'), Level.debug), ('write', 'dbg', Level.debug), ('write', 'warn', Level.warning)]) - for Log in StdoutLog, DataLog, HtmlLog, RichOutputLog: - with self.subTest('replay to {}'.format(Log.__name__)), Log.output_tester(self) as log: - recordlog.replay(log) def test_replay_in_current(self): - recordlog = treelog.RecordLog() + recordlog = treelog.RecordLog(simplify=self.simplify) recordlog.write('test', level=Level.info) - with self.assertSilent(), treelog.set(treelog.LoggingLog()), self.assertLogs('nutils'): + with treelog.set(treelog.LoggingLog()), self.assertLogs('nutils'): recordlog.replay() -class SimplifiedRecordLog(Log): +class SimplifiedRecordLog(RecordLog): - @contextlib.contextmanager - def output_tester(self): - recordlog = treelog.RecordLog(simplify=True) - yield recordlog - self.assertEqual(recordlog._messages, [ + simplify = True + + def check_output(self, messages): + self.assertEqual(messages, [ ('write', 'my message', Level.user), ('write', Data('test.dat', b'test1'), Level.info), ('pushcontext', 'my context'), @@ -378,54 +367,24 @@ def output_tester(self): ('write', Data('dbg.jpg', b'test4', type='image/jpg'), Level.debug), ('write', 'dbg', Level.debug), ('write', 'warn', Level.warning)]) - for Log in StdoutLog, DataLog, HtmlLog: - with self.subTest('replay to {}'.format(Log.__name__)), Log.output_tester(self) as log: - recordlog.replay(log) - - def test_replay_in_current(self): - recordlog = treelog.RecordLog() - recordlog.write('test', level=Level.info) - with self.assertSilent(), treelog.set(treelog.LoggingLog()), self.assertLogs('nutils'): - recordlog.replay() -class TeeLogTestLog: +class TeeLog(unittest.TestCase): - def __init__(self, dir, update, filenos): - self._dir = dir - self._update = update - self.filenos = filenos - - def pushcontext(self, title): - pass - - def popcontext(self): - pass - - def recontext(self, title): - pass - - def write(self, text, level): - pass - - @contextlib.contextmanager - def open(self, filename, mode, level): - with open(os.path.join(self._dir, filename), mode+'+' if self._update else mode) as f: - self.filenos.add(f.fileno()) - try: - yield f - finally: - self.filenos.remove(f.fileno()) - - -class TeeLog(Log): - - @contextlib.contextmanager - def output_tester(self): - with DataLog.output_tester(self) as datalog, \ - RecordLog.output_tester(self) as recordlog, \ - RichOutputLog.output_tester(self) as richoutputlog: - yield treelog.TeeLog(richoutputlog, treelog.TeeLog(datalog, recordlog)) + def test_output(self): + f = io.StringIO() + with tempfile.TemporaryDirectory() as tmpdir: + datalog = treelog.DataLog(tmpdir) + recordlog = treelog.RecordLog(simplify=False) + richoutputlog = treelog.RichOutputLog(f) + with treelog.set(treelog.TeeLog(richoutputlog, treelog.TeeLog(datalog, recordlog))): + generate() + with self.subTest('DataLog'): + DataLog.check_output(self, tmpdir) + with self.subTest('RecordLog'): + RecordLog.check_output(self, recordlog._messages) + with self.subTest('RichOutputLog'): + RichOutputLog.check_output(self, f) def test_open_datalog_datalog_samedir(self): with tempfile.TemporaryDirectory() as tmpdir: @@ -438,13 +397,16 @@ def test_open_datalog_datalog_samedir(self): self.assertEqual(f.read(), b'test') -class FilterMinLog(Log): +class FilterMinLog(unittest.TestCase): - @contextlib.contextmanager - def output_tester(self): + def test_output(self): recordlog = treelog.RecordLog() - yield treelog.FilterLog(recordlog, minlevel=Level.user) - self.assertEqual(recordlog._messages, [ + with treelog.set(treelog.FilterLog(recordlog, minlevel=Level.user)): + generate() + self.check_output(recordlog._messages) + + def check_output(self, messages): + self.assertEqual(messages, [ ('write', 'my message', Level.user), ('pushcontext', 'my context'), ('write', 'multiple..\n ..lines', Level.error), @@ -456,13 +418,16 @@ def output_tester(self): ('write', 'warn', Level.warning)]) -class FilterMaxLog(Log): +class FilterMaxLog(unittest.TestCase): - @contextlib.contextmanager - def output_tester(self): + def test_output(self): recordlog = treelog.RecordLog() - yield treelog.FilterLog(recordlog, maxlevel=Level.user) - self.assertEqual(recordlog._messages, [ + with treelog.set(treelog.FilterLog(recordlog, maxlevel=Level.user)): + generate() + self.check_output(recordlog._messages) + + def check_output(self, messages): + self.assertEqual(messages, [ ('write', 'my message', Level.user), ('write', Data('test.dat', b'test1'), Level.info), ('pushcontext', 'my context'), @@ -485,13 +450,16 @@ def output_tester(self): ('write', 'dbg', Level.debug)]) -class FilterMinMaxLog(Log): +class FilterMinMaxLog(unittest.TestCase): - @contextlib.contextmanager - def output_tester(self): + def test_output(self): recordlog = treelog.RecordLog() - yield treelog.FilterLog(recordlog, minlevel=Level.info, maxlevel=Level.warning) - self.assertEqual(recordlog._messages, [ + with treelog.set(treelog.FilterLog(recordlog, minlevel=Level.info, maxlevel=Level.warning)): + generate() + self.check_output(recordlog._messages) + + def check_output(self, messages): + self.assertEqual(messages, [ ('write', 'my message', Level.user), ('write', Data('test.dat', b'test1'), Level.info), ('pushcontext', 'my context'), @@ -515,13 +483,15 @@ def output_tester(self): ('write', 'warn', Level.warning)]) -class LoggingLog(Log): +class LoggingLog(unittest.TestCase): - @contextlib.contextmanager - def output_tester(self): - with self.assertLogs('nutils') as cm: - yield treelog.LoggingLog() - self.assertEqual(cm.output, [ + def test_output(self): + with self.assertLogs('nutils') as cm, treelog.set(treelog.LoggingLog()): + generate() + self.check_output(cm.output) + + def check_output(self, output): + self.assertEqual(output, [ 'Level 25:nutils:my message', 'INFO:nutils:test.dat [5 bytes]', 'INFO:nutils:my context > iter 1 > a', @@ -537,12 +507,11 @@ def output_tester(self): 'WARNING:nutils:warn']) -class NullLog(Log): +class NullLog(unittest.TestCase): - @contextlib.contextmanager - def output_tester(self): - with self.assertSilent(): - yield treelog.NullLog() + def test_output(self): + with treelog.set(treelog.NullLog()): + generate() def test_disable(self): with treelog.disable(): @@ -684,27 +653,4 @@ def test_docs(self): doctest.testmod(treelog) -del Log # hide from unittest discovery - -# INTERNALS - - -@contextlib.contextmanager -def capture(): - with tempfile.TemporaryFile('w+', newline='') as stdout, tempfile.TemporaryFile('w+', newline='') as stderr: - class captured: - pass - with contextlib.redirect_stdout(stdout), contextlib.redirect_stderr(stderr): - yield captured - stdout.seek(0) - captured.stdout = stdout.read() - stderr.seek(0) - captured.stderr = stderr.read() - - -@contextlib.contextmanager -def silent(): - with open(os.devnull, 'w') as f, contextlib.redirect_stdout(f): - yield - -# vim:sw=2:sts=2:et +# vim:sw=4:sts=4:et diff --git a/treelog/__init__.py b/treelog/__init__.py index 46130c5..21b5836 100644 --- a/treelog/__init__.py +++ b/treelog/__init__.py @@ -28,14 +28,9 @@ 'NullLog', 'RecordLog', 'RichOutputLog', - 'StderrLog', 'StdoutLog', 'TeeLog', } -_legacy = { - 'version': __version__, - 'Log': None, -} def __dir__(): return ( @@ -44,7 +39,6 @@ def __dir__(): *_state_attrs, *_state_funcs, *_log_objs, - *_legacy, ) def __getattr__(attr): @@ -59,8 +53,6 @@ def __getattr__(attr): obj = getattr(m, attr) elif attr in _sub_mods: obj = import_module(f'.{attr}', 'treelog') - elif attr in _legacy: - obj = _legacy[attr] else: raise AttributeError(attr) globals()[attr] = obj diff --git a/treelog/_context.py b/treelog/_context.py deleted file mode 100644 index 8e51013..0000000 --- a/treelog/_context.py +++ /dev/null @@ -1,28 +0,0 @@ -class ContextLog: - '''Base class for loggers that keep track of the current list of contexts. - - The base class implements :meth:`context` and :meth:`open` which keep the - attribute :attr:`currentcontext` up-to-date. - - .. attribute:: currentcontext - - A :class:`list` of contexts (:class:`str`\\s) that are currently active. - ''' - - def __init__(self) -> None: - self.currentcontext = [] # type: typing.List[str] - - def pushcontext(self, title: str) -> None: - self.currentcontext.append(title) - self.contextchangedhook() - - def popcontext(self) -> None: - self.currentcontext.pop() - self.contextchangedhook() - - def recontext(self, title: str) -> None: - self.currentcontext[-1] = title - self.contextchangedhook() - - def contextchangedhook(self) -> None: - pass diff --git a/treelog/_data.py b/treelog/_data.py index e8c04ff..e51e685 100644 --- a/treelog/_data.py +++ b/treelog/_data.py @@ -2,7 +2,7 @@ import os import typing -from ._path import makedirs, sequence +from ._path import makedirs, sequence, non_existent from .proto import Level, Data @@ -24,13 +24,6 @@ def recontext(self, title: str) -> None: def write(self, msg, level: Level) -> None: if isinstance(msg, Data): - for filename in self._names(msg.name): - try: - f = (self._path / filename).open('xb') - except FileExistsError: - continue - break - else: - raise ValueError('all filenames are in use') + _, f = non_existent(self._path, self._names(msg.name), lambda p: p.open('xb')) with f: f.write(msg.data) diff --git a/treelog/_html.py b/treelog/_html.py index c714f90..467091b 100644 --- a/treelog/_html.py +++ b/treelog/_html.py @@ -8,7 +8,7 @@ import urllib.parse import warnings -from ._path import makedirs, sequence +from ._path import makedirs, sequence, non_existent from .proto import Level, Data @@ -17,14 +17,7 @@ class HtmlLog: def __init__(self, dirpath: str, *, filename: str = 'log.html', title: typing.Optional[str] = None, htmltitle: typing.Optional[str] = None, favicon: typing.Optional[str] = None) -> None: self._path = makedirs(dirpath) - for self.filename in sequence(filename): - try: - self._file = (self._path / self.filename).open('x', encoding='utf-8') - except FileExistsError: - continue - break - else: - raise ValueError('all filenames are in use') + self.filename, self._file = non_existent(self._path, sequence(filename), lambda p: p.open('x', encoding='utf-8')) css = self._write_hash(CSS.encode(), '.css') js = self._write_hash(JS.encode(), '.js') if title is None: diff --git a/treelog/_logging.py b/treelog/_logging.py index 7b3b514..93f6d0f 100644 --- a/treelog/_logging.py +++ b/treelog/_logging.py @@ -1,11 +1,10 @@ import logging import typing -from ._context import ContextLog from .proto import Level -class LoggingLog(ContextLog): +class LoggingLog: '''Log to Python's built-in logging facility.''' # type: typing.ClassVar[typing.Tuple[int, int, int, int, int]] @@ -13,7 +12,16 @@ class LoggingLog(ContextLog): def __init__(self, name: str = 'nutils') -> None: self._logger = logging.getLogger(name) - super().__init__() + self.currentcontext = [] # type: typing.List[str] + + def pushcontext(self, title: str) -> None: + self.currentcontext.append(title) + + def popcontext(self) -> None: + self.currentcontext.pop() + + def recontext(self, title: str) -> None: + self.currentcontext[-1] = title def write(self, msg, level: Level, data: typing.Optional[bytes] = None) -> None: self._logger.log(self._levels[level.value], ' > '.join( diff --git a/treelog/_path.py b/treelog/_path.py index 0885f76..ee8398c 100644 --- a/treelog/_path.py +++ b/treelog/_path.py @@ -27,6 +27,23 @@ def sequence(filename: str) -> typing.Generator[str, None, None]: i += 1 +def non_existent(path, names, f): + if isinstance(path, str): + path = pathlib.Path(path) + for name in names: + try: + return name, f(path / name) + except FileExistsError: + pass + except PermissionError: + # On Windows, trying to open a path that exists as a directory + # triggers a permission error. To avoid a runaway iteration, we + # continue to the next name only if the path indeed exists. + if not isinstance(path, pathlib.Path) or not (path / name).exists(): + raise + raise Exception('names exhausted') + + class _FDDirPath: def __init__(self, dir_fd: int) -> None: diff --git a/treelog/_record.py b/treelog/_record.py index b647034..399ce21 100644 --- a/treelog/_record.py +++ b/treelog/_record.py @@ -61,7 +61,6 @@ def replay(self, log: typing.Optional[Log] = None) -> None: All recorded messages and files will be written to the log that is either directly specified or currently active.''' - files = {} if log is None: from ._state import current as log for cmd, *args in self._messages: diff --git a/treelog/_richoutput.py b/treelog/_richoutput.py index bcd6b3e..a0fd774 100644 --- a/treelog/_richoutput.py +++ b/treelog/_richoutput.py @@ -1,11 +1,10 @@ import sys import typing -from ._context import ContextLog from .proto import Level -class RichOutputLog(ContextLog): +class RichOutputLog: '''Output rich (colored,unicode) text to stream.''' _cmap = ( @@ -15,10 +14,23 @@ class RichOutputLog(ContextLog): '\033[1;35m', # warning: bold purple '\033[1;31m') # error: bold red - def __init__(self) -> None: - super().__init__() + def __init__(self, file=sys.stdout) -> None: self._current = '' # currently printed context + self.file = file set_ansi_console() + self.currentcontext = [] # type: typing.List[str] + + def pushcontext(self, title: str) -> None: + self.currentcontext.append(title) + self.contextchangedhook() + + def popcontext(self) -> None: + self.currentcontext.pop() + self.contextchangedhook() + + def recontext(self, title: str) -> None: + self.currentcontext[-1] = title + self.contextchangedhook() def contextchangedhook(self) -> None: _current = ''.join(item + ' > ' for item in self.currentcontext) @@ -34,13 +46,16 @@ def contextchangedhook(self) -> None: items.append(_current[n:]) if len(_current) < len(self._current): items.append('\033[K') - sys.stdout.write(''.join(items)) - sys.stdout.flush() + self.file.write(''.join(items)) + self.file.flush() self._current = _current def write(self, msg, level: Level) -> None: - sys.stdout.write( - ''.join([self._cmap[level.value], str(msg), '\033[0m\n', self._current])) + msg = str(msg) + if self._current and '\n' in msg: + msg = msg.replace('\n', '\033[0m\n' + ' > '.rjust(len(self._current)) + self._cmap[level.value]) + self.file.write( + ''.join([self._cmap[level.value], msg, '\033[0m\n', self._current])) def first(items: typing.Iterable[bool]) -> int: diff --git a/treelog/_stderr.py b/treelog/_stderr.py deleted file mode 100644 index 4953355..0000000 --- a/treelog/_stderr.py +++ /dev/null @@ -1,11 +0,0 @@ -import sys - -from ._context import ContextLog -from .proto import Level - - -class StderrLog(ContextLog): - '''Output plain text to stream.''' - - def write(self, msg, level: Level) -> None: - print(*self.currentcontext, msg, sep=' > ', file=sys.stderr) diff --git a/treelog/_stdout.py b/treelog/_stdout.py index 815825e..fcc846c 100644 --- a/treelog/_stdout.py +++ b/treelog/_stdout.py @@ -1,9 +1,26 @@ +import sys + from . import proto -from ._context import ContextLog -class StdoutLog(ContextLog): +class StdoutLog: '''Output plain text to stream.''' + def __init__(self, file=sys.stdout): + self.file = file + self.currentcontext = [] # type: typing.List[str] + + def pushcontext(self, title: str) -> None: + self.currentcontext.append(title + ' > ') + + def popcontext(self) -> None: + self.currentcontext.pop() + + def recontext(self, title: str) -> None: + self.currentcontext[-1] = title + ' > ' + def write(self, msg, level: proto.Level) -> None: - print(*self.currentcontext, msg, sep=' > ') + if self.currentcontext: + prefix = ''.join(self.currentcontext) + msg = prefix + str(msg).replace('\n', '\n' + ' > '.rjust(len(prefix))) + print(msg, file=self.file)