Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 45 additions & 38 deletions internal/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ type playerImpl struct {
volume float64
err error
state playerState
bufPool *sync.Pool
buf []byte
eof bool
bufferSize int
Expand Down Expand Up @@ -239,41 +238,29 @@ func (p *playerImpl) setBufferSize(bufferSize int) {
p.m.Lock()
defer p.m.Unlock()

orig := p.bufferSize
p.bufferSize = bufferSize
if bufferSize == 0 {
p.bufferSize = p.mux.defaultBufferSize()
}
if orig != p.bufferSize {
p.bufPool = nil
}
}

func (p *playerImpl) getTmpBuf() ([]byte, func()) {
// The returned buffer could be accessed regardless of the mutex m (#254).
// In order to avoid races, use a sync.Pool.
// On the other hand, the calls of getTmpBuf itself should be protected by the mutex m,
// then accessing p.bufPool doesn't cause races.
if p.bufPool == nil {
p.bufPool = &sync.Pool{
New: func() interface{} {
buf := make([]byte, p.bufferSize)
return &buf
},
}
}
buf := p.bufPool.Get().(*[]byte)
return *buf, func() {
// p.bufPool could be nil when setBufferSize is called (#258).
// buf doesn't have to (or cannot) be put back to the pool, as the size of the buffer could be changed.
if p.bufPool == nil {
return
}
if len(*buf) != p.bufferSize {
return
}
p.bufPool.Put(buf)
var theBufPool = sync.Pool{
New: func() interface{} {
var buf []byte
return &buf
},
}

func getBufferFromPool(size int) *[]byte {
buf := theBufPool.Get().(*[]byte)

if cap(*buf) < size {
*buf = make([]byte, size)
}

*buf = (*buf)[:size]

return buf
}

// read reads the source to buf.
Expand Down Expand Up @@ -315,15 +302,20 @@ func (p *playerImpl) playImpl() {
p.state = playerPlay

if !p.eof {
buf, free := p.getTmpBuf()
defer free()
buf := getBufferFromPool(p.bufferSize)
defer theBufPool.Put(buf)

if p.buf == nil {
p.buf = (*getBufferFromPool(p.bufferSize))[:0]
}

for len(p.buf) < p.bufferSize {
Comment thread
oliverbestmann marked this conversation as resolved.
n, err := p.read(buf)
n, err := p.read(*buf)
if err != nil && err != io.EOF {
p.setErrorImpl(err)
return
}
p.buf = append(p.buf, buf[:n]...)
p.buf = append(p.buf, (*buf)[:n]...)
if err == io.EOF {
p.eof = true
break
Expand All @@ -332,6 +324,7 @@ func (p *playerImpl) playImpl() {
}

if p.eof && len(p.buf) == 0 {
p.returnBufferToPool()
p.state = playerPaused
}

Expand Down Expand Up @@ -456,7 +449,8 @@ func (p *playerImpl) closeImpl() error {
return p.err
}
p.state = playerClosed
p.buf = nil
p.returnBufferToPool()

return p.err
}

Expand Down Expand Up @@ -514,6 +508,7 @@ func (p *playerImpl) readBufferAndAdd(buf []float32) int {
p.buf = p.buf[:len(p.buf)-n*bitDepthInBytes]

if p.eof && len(p.buf) == 0 {
p.returnBufferToPool()
p.state = playerPaused
}

Expand Down Expand Up @@ -545,16 +540,20 @@ func (p *playerImpl) readSourceToBuffer() int {
return 0
}

buf, free := p.getTmpBuf()
defer free()
n, err := p.read(buf)
buf := getBufferFromPool(p.bufferSize)
defer theBufPool.Put(buf)
n, err := p.read(*buf)

if err != nil && err != io.EOF {
p.setErrorImpl(err)
return 0
}

p.buf = append(p.buf, buf[:n]...)
if p.buf == nil {
p.buf = (*getBufferFromPool(p.bufferSize))[:0]
}

p.buf = append(p.buf, (*buf)[:n]...)
if err == io.EOF {
p.eof = true
if len(p.buf) == 0 {
Expand All @@ -569,6 +568,14 @@ func (p *playerImpl) setErrorImpl(err error) {
p.closeImpl()
}

func (p *playerImpl) returnBufferToPool() {
if p.buf != nil {
buf := p.buf
theBufPool.Put(&buf)
p.buf = nil
}
}

// TODO: The term 'buffer' is confusing. Name each buffer with good terms.

// defaultBufferSize returns the default size of the buffer for the audio source.
Expand Down
Loading