diff --git a/leveldb/table.go b/leveldb/table.go index b7759b2f..1ba78d1a 100644 --- a/leveldb/table.go +++ b/leveldb/table.go @@ -501,7 +501,6 @@ func (t *tOps) remove(fd storage.FileDesc) { // Closes the table ops instance. It will close all tables, // regadless still used or not. func (t *tOps) close() { - t.bpool.Close() t.cache.Close() if t.bcache != nil { t.bcache.CloseWeak() diff --git a/leveldb/util/buffer_pool.go b/leveldb/util/buffer_pool.go index 2f3db974..53c0a80b 100644 --- a/leveldb/util/buffer_pool.go +++ b/leveldb/util/buffer_pool.go @@ -8,29 +8,20 @@ package util import ( "fmt" - "sync" + "runtime" "sync/atomic" "time" ) -type buffer struct { - b []byte - miss int -} - // BufferPool is a 'buffer pool'. type BufferPool struct { - pool [6]chan []byte + pool []chan []byte size [5]uint32 sizeMiss [5]uint32 sizeHalf [5]uint32 baseline [4]int baseline0 int - mu sync.RWMutex - closed bool - closeC chan struct{} - get uint32 put uint32 half uint32 @@ -58,13 +49,6 @@ func (p *BufferPool) Get(n int) []byte { return make([]byte, n) } - p.mu.RLock() - defer p.mu.RUnlock() - - if p.closed { - return make([]byte, n) - } - atomic.AddUint32(&p.get, 1) poolNum := p.poolNum(n) @@ -160,13 +144,6 @@ func (p *BufferPool) Put(b []byte) { return } - p.mu.RLock() - defer p.mu.RUnlock() - - if p.closed { - return - } - atomic.AddUint32(&p.put, 1) pool := p.pool[p.poolNum(cap(b))] @@ -177,19 +154,6 @@ func (p *BufferPool) Put(b []byte) { } -func (p *BufferPool) Close() { - if p == nil { - return - } - - p.mu.Lock() - if !p.closed { - p.closed = true - p.closeC <- struct{}{} - } - p.mu.Unlock() -} - func (p *BufferPool) String() string { if p == nil { return "" @@ -199,21 +163,20 @@ func (p *BufferPool) String() string { p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss) } -func (p *BufferPool) drain() { +func drain(pool []chan []byte, closeC <-chan struct{}) { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: - for _, ch := range p.pool { + for _, ch := range pool { select { case <-ch: default: } } - case <-p.closeC: - close(p.closeC) - for _, ch := range p.pool { + case <-closeC: + for _, ch := range pool { close(ch) } return @@ -227,13 +190,15 @@ func NewBufferPool(baseline int) *BufferPool { panic("baseline can't be <= 0") } p := &BufferPool{ + pool: make([]chan []byte, 6), baseline0: baseline, baseline: [...]int{baseline / 4, baseline / 2, baseline * 2, baseline * 4}, - closeC: make(chan struct{}, 1), } + closeC := make(chan struct{}, 1) for i, cap := range []int{2, 2, 4, 4, 2, 1} { p.pool[i] = make(chan []byte, cap) } - go p.drain() + runtime.SetFinalizer(p, func(*BufferPool) { close(closeC) }) + go drain(p.pool, closeC) return p } diff --git a/leveldb/util/buffer_pool_test.go b/leveldb/util/buffer_pool_test.go new file mode 100644 index 00000000..06af479b --- /dev/null +++ b/leveldb/util/buffer_pool_test.go @@ -0,0 +1,43 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package util + +import ( + "runtime" + "testing" + "time" +) + +func checkClosed(ch chan []byte) (closed bool) { + defer func() { + x := recover() + if err, ok := x.(error); ok && err.Error() == "send on closed channel" { + closed = true + } + }() + select { + case ch <- nil: + default: + } + return +} + +func TestBufferPoolCloseByGC(t *testing.T) { + bpool := NewBufferPool(1024) + pool := bpool.pool // ref to check closed + buf := bpool.Get(1024) + if 1024 != len(buf) { + t.Errorf("Get() return invalid length buffer, got(%v), expect(%v)", len(buf), 1024) + } + bpool.Put(buf) + bpool = nil + runtime.GC() + time.Sleep(time.Second) + for i, ch := range pool { + if !checkClosed(ch) { + t.Errorf("pool[%d] should be closed after GC", i) + } + } +}