Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tabula.vscode.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:
workflow_dispatch:

env:
NODE_VERSION: 20.x
NODE_VERSION: 24.x

jobs:
tabula_vscode:
Expand Down
110 changes: 70 additions & 40 deletions internal/markdown/process.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package markdown

import (
"errors"
"fmt"
"io"
"strings"
"sync"

"github.com/pblazh/tabula/internal/csv"
)
Expand All @@ -18,60 +20,88 @@ func Process(
return err
}

var result []chunk
result := make([][]*chunk, len(chunks))
var wg sync.WaitGroup
var mu sync.Mutex
var errs []error

for i := range chunks {
ch := chunks[i]
write := csv.Write
wrap := wrapCodeBlock
for i, ch := range chunks {
result[i] = make([]*chunk, 0)
scriptChunk := getScriptChunk(chunks, i)

if ch.kind == csvKind && config.Align {
write = csv.WriteAligned
if chunkNeedsProcessing(&ch, scriptChunk) {
wg.Add(1)
go func() {
defer wg.Done()
output, err := processChunk(config, &ch, scriptChunk)
if err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
return
}
result[i] = output
}()
continue
}
result[i] = append(result[i], &ch)
}

if ch.kind == tableKind {
write = WriteAligned
wrap = wrapTable
}
wg.Wait()

scriptChunk := getScriptChunk(chunks, i)
if chunkNeedsProcessing(&ch, scriptChunk) {
data, comments, err := execute(config, &ch, scriptChunk)
if err != nil {
result = append(result, ch)
result = append(
result,
chunk{
kind: messageKind,
text: []string{toMessage(err.Error())},
},
)
continue
}
if err := errors.Join(errs...); err != nil {
return err
}

// create a formatter depending on if align was requested
var sb strings.Builder
err = write(&sb, data, comments)
for _, chs := range result {
for _, ch := range chs {
_, err = fmt.Fprintf(writer, "%s\n", ch)
if err != nil {
return ErrWriteCSV(err)
return ErrWriteMD(err)
}
}
}

lines := wrap(sb.String())
return nil
}

result = append(result, chunk{kind: csvKind, text: lines})
continue
}
result = append(result, ch)
func processChunk(config *Config, data, script *chunk) ([]*chunk, error) {
var result []*chunk
lines, comments, err := execute(config, data, script)
if err != nil {
result = append(result, data)
result = append(
result,
&chunk{
kind: messageKind,
text: []string{toMessage(err.Error())},
},
)
return result, nil
}

for _, ch := range result {
_, err = fmt.Fprintf(writer, "%s\n", ch)
if err != nil {
return ErrWriteMD(err)
}
write := csv.Write
wrap := wrapCodeBlock

if data.kind == csvKind && config.Align {
write = csv.WriteAligned
}

return nil
if data.kind == tableKind {
write = WriteAligned
wrap = wrapTable
}
// create a formatter depending on if align was requested
var sb strings.Builder
err = write(&sb, lines, comments)
if err != nil {
return nil, ErrWriteCSV(err)
}

output := wrap(sb.String())

result = append(result, &chunk{kind: csvKind, text: output})
return result, nil
}

func wrapCodeBlock(code string) []string {
Expand Down
102 changes: 102 additions & 0 deletions internal/markdown/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package markdown

import (
"strings"
"sync"
"testing"
)

Expand Down Expand Up @@ -335,6 +336,45 @@ func TestProcess(t *testing.T) {
"",
}, "\n"),
},
{
name: "multiple CSV with code concurrent",
input: strings.Join([]string{
"```csv",
"one,two,three",
"1,2,3",
"```",
"```tabula",
"let A2 = 10;",
"```",
"",
"```csv",
"one,two,three",
"4,5,6",
"```",
"```tabula",
"let A2 = 20;",
"```",
"",
}, "\n"),
output: strings.Join([]string{
"```csv",
"one,two,three",
"10,2,3",
"```",
"```tabula",
"let A2 = 10;",
"```",
"",
"```csv",
"one,two,three",
"20,5,6",
"```",
"```tabula",
"let A2 = 20;",
"```",
"",
}, "\n"),
},
}

for _, tc := range cases {
Expand All @@ -357,3 +397,65 @@ func TestProcess(t *testing.T) {
})
}
}

// TestProcessConcurrentChunks verifies that multiple processable chunks in a
// single document are each executed with their own correct script and data.
// This guards against closure variable capture bugs where goroutines share
// loop variables and all end up processing the last chunk's data.
func TestProcessConcurrentChunks(t *testing.T) {
input := strings.Join([]string{
"```csv",
"one,two,three",
"1,2,3",
"```",
"```tabula",
"let A2 = 10;",
"```",
"",
"```csv",
"one,two,three",
"4,5,6",
"```",
"```tabula",
"let A2 = 20;",
"```",
"",
}, "\n")

want := strings.Join([]string{
"```csv",
"one,two,three",
"10,2,3",
"```",
"```tabula",
"let A2 = 10;",
"```",
"",
"```csv",
"one,two,three",
"20,5,6",
"```",
"```tabula",
"let A2 = 20;",
"```",
"",
}, "\n")

const iterations = 100
var wg sync.WaitGroup
wg.Add(iterations)
for range iterations {
go func() {
defer wg.Done()
var writer strings.Builder
if err := Process(&Config{}, strings.NewReader(input), &writer); err != nil {
t.Errorf("unexpected error: %s", err)
return
}
if got := writer.String(); got != want {
t.Errorf("concurrent output mismatch:\nwant: %q\n got: %q", want, got)
}
}()
}
wg.Wait()
}
Loading