Skip to content

Commit 7f2eda9

Browse files
committed
CV Optimistic Export and Import
1 parent 11ba496 commit 7f2eda9

7 files changed

+303
-27
lines changed

export.go

+58-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package iavl
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
67
"fmt"
@@ -31,13 +32,24 @@ type ExportNode struct {
3132
// depth-first post-order (LRN), this order must be preserved when importing in order to recreate
3233
// the same tree structure.
3334
type Exporter struct {
34-
tree *ImmutableTree
35-
ch chan *ExportNode
36-
cancel context.CancelFunc
35+
tree *ImmutableTree
36+
ch chan *ExportNode
37+
cancel context.CancelFunc
38+
optimistic bool // export raw key value pairs for optimistic import
3739
}
3840

3941
// NewExporter creates a new Exporter. Callers must call Close() when done.
4042
func newExporter(tree *ImmutableTree) (*Exporter, error) {
43+
return newExporterWithOptions(tree, false)
44+
}
45+
46+
// NewOptimisticExporter creates a new Exporter with raw Key Values. Callers must call Close() when done.
47+
func newOptimisticExporter(tree *ImmutableTree) (*Exporter, error) {
48+
return newExporterWithOptions(tree, true)
49+
}
50+
51+
// NewExporterWithOptions creates a new Exporter and configures optimistic mode
52+
func newExporterWithOptions(tree *ImmutableTree, optimistic bool) (*Exporter, error) {
4153
if tree == nil {
4254
return nil, fmt.Errorf("tree is nil: %w", ErrNotInitalizedTree)
4355
}
@@ -48,13 +60,18 @@ func newExporter(tree *ImmutableTree) (*Exporter, error) {
4860

4961
ctx, cancel := context.WithCancel(context.Background())
5062
exporter := &Exporter{
51-
tree: tree,
52-
ch: make(chan *ExportNode, exportBufferSize),
53-
cancel: cancel,
63+
tree: tree,
64+
ch: make(chan *ExportNode, exportBufferSize),
65+
cancel: cancel,
66+
optimistic: optimistic,
5467
}
5568

5669
tree.ndb.incrVersionReaders(tree.version)
57-
go exporter.export(ctx)
70+
if exporter.optimistic {
71+
go exporter.optimisticExport(ctx)
72+
} else {
73+
go exporter.export(ctx)
74+
}
5875

5976
return exporter, nil
6077
}
@@ -79,6 +96,40 @@ func (e *Exporter) export(ctx context.Context) {
7996
close(e.ch)
8097
}
8198

99+
// optimisticExport exports raw key, value nodes
100+
// Cosmos-SDK should set different snapshot format so nodes can select between either "untrusted statesync" or "trusted-peer optimistic" import
101+
func (e *Exporter) optimisticExport(ctx context.Context) {
102+
e.tree.root.traverse(e.tree, true, func(node *Node) bool {
103+
// TODO: How to get the original db value bytes directly without writeBytes()?
104+
buf := bufPool.Get().(*bytes.Buffer)
105+
buf.Reset()
106+
defer bufPool.Put(buf)
107+
108+
if err := node.writeBytes(buf); err != nil {
109+
fmt.Printf("WARN: failed writeBytes")
110+
}
111+
112+
bytesCopy := make([]byte, buf.Len())
113+
copy(bytesCopy, buf.Bytes())
114+
115+
// Use Export Node Format.
116+
exportNode := &ExportNode{
117+
Key: node.GetKey(), // TODO: How to get prefixed key so that import does not need to prefix?
118+
Value: bytesCopy,
119+
Version: 0, // Version not used
120+
Height: 0, // Height not used
121+
}
122+
123+
select {
124+
case e.ch <- exportNode:
125+
return false
126+
case <-ctx.Done():
127+
return true
128+
}
129+
})
130+
close(e.ch)
131+
}
132+
82133
// Next fetches the next exported node, or returns ExportDone when done.
83134
func (e *Exporter) Next() (*ExportNode, error) {
84135
if exportNode, ok := <-e.ch; ok {

export_test.go

+56
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,62 @@ func TestExporter_Import(t *testing.T) {
298298
}
299299
}
300300

301+
func TestOptimisticExporter_Import(t *testing.T) {
302+
testcases := map[string]*ImmutableTree{
303+
"empty tree": NewImmutableTree(dbm.NewMemDB(), 0, false, log.NewNopLogger()),
304+
"basic tree": setupExportTreeBasic(t),
305+
}
306+
if !testing.Short() {
307+
testcases["sized tree"] = setupExportTreeSized(t, 4096)
308+
testcases["random tree"] = setupExportTreeRandom(t)
309+
}
310+
311+
for desc, tree := range testcases {
312+
tree := tree
313+
t.Run(desc, func(t *testing.T) {
314+
t.Parallel()
315+
316+
exporter, err := tree.OptimisticExport()
317+
require.NoError(t, err)
318+
defer exporter.Close()
319+
320+
newTree := NewMutableTree(dbm.NewMemDB(), 0, false, log.NewNopLogger())
321+
importer, err := newTree.OptimisticImport(tree.Version())
322+
require.NoError(t, err)
323+
defer importer.Close()
324+
325+
for {
326+
item, err := exporter.Next()
327+
if err == ErrorExportDone {
328+
err = importer.Commit()
329+
require.NoError(t, err)
330+
break
331+
}
332+
require.NoError(t, err)
333+
err = importer.Add(item)
334+
require.NoError(t, err)
335+
}
336+
337+
treeHash := tree.Hash()
338+
newTreeHash := newTree.Hash()
339+
340+
require.Equal(t, treeHash, newTreeHash, "Tree hash mismatch")
341+
require.Equal(t, tree.Size(), newTree.Size(), "Tree size mismatch")
342+
require.Equal(t, tree.Version(), newTree.Version(), "Tree version mismatch")
343+
344+
tree.Iterate(func(key, value []byte) bool { //nolint:errcheck
345+
index, _, err := tree.GetWithIndex(key)
346+
require.NoError(t, err)
347+
newIndex, newValue, err := newTree.GetWithIndex(key)
348+
require.NoError(t, err)
349+
require.Equal(t, index, newIndex, "Index mismatch for key %v", key)
350+
require.Equal(t, value, newValue, "Value mismatch for key %v", key)
351+
return false
352+
})
353+
})
354+
}
355+
}
356+
301357
func TestExporter_Close(t *testing.T) {
302358
tree := setupExportTreeSized(t, 4096)
303359
exporter, err := tree.Export()

immutable_tree.go

+6
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,12 @@ func (t *ImmutableTree) Export() (*Exporter, error) {
160160
return newExporter(t)
161161
}
162162

163+
// OptimisiticExport returns an iterator that exports tree nodes as ExportNodes. These nodes can be
164+
// imported with MutableTree.Import() to recreate an identical tree.
165+
func (t *ImmutableTree) OptimisticExport() (*Exporter, error) {
166+
return newOptimisticExporter(t)
167+
}
168+
163169
// GetWithIndex returns the index and value of the specified key if it exists, or nil and the next index
164170
// otherwise. The returned value must not be modified, since it may point to data stored within
165171
// IAVL.

import.go

+94-20
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,27 @@ type Importer struct {
3131

3232
// inflightCommit tracks a batch commit, if any.
3333
inflightCommit <-chan error
34+
35+
// Optimistic raw key value import
36+
optimistic bool
3437
}
3538

3639
// newImporter creates a new Importer for an empty MutableTree.
3740
//
3841
// version should correspond to the version that was initially exported. It must be greater than
3942
// or equal to the highest ExportNode version number given.
4043
func newImporter(tree *MutableTree, version int64) (*Importer, error) {
44+
return newImporterWithOptions(tree, version, false)
45+
}
46+
47+
// newOptimisticImporter creates a new Importer for an empty MutableTree.
48+
//
49+
// expects optimistic raw key values for import
50+
func newOptimisticImporter(tree *MutableTree, version int64) (*Importer, error) {
51+
return newImporterWithOptions(tree, version, true)
52+
}
53+
54+
func newImporterWithOptions(tree *MutableTree, version int64, optimistic bool) (*Importer, error) {
4155
if version < 0 {
4256
return nil, errors.New("imported version cannot be negative")
4357
}
@@ -49,11 +63,12 @@ func newImporter(tree *MutableTree, version int64) (*Importer, error) {
4963
}
5064

5165
return &Importer{
52-
tree: tree,
53-
version: version,
54-
batch: tree.ndb.db.NewBatch(),
55-
stack: make([]*Node, 0, 8),
56-
nonces: make([]uint32, version+1),
66+
tree: tree,
67+
version: version,
68+
batch: tree.ndb.db.NewBatch(),
69+
stack: make([]*Node, 0, 8),
70+
nonces: make([]uint32, version+1),
71+
optimistic: optimistic,
5772
}, nil
5873
}
5974

@@ -117,10 +132,65 @@ func (i *Importer) Close() {
117132
i.tree = nil
118133
}
119134

135+
// sendBatchIfFull can be called during imports after each key add
136+
// automatically batch.Write() when pending writes > maxBatchSize
137+
func (i *Importer) sendBatchIfFull() error {
138+
if i.batchSize >= maxBatchSize {
139+
// Wait for previous batch.
140+
var err error
141+
if i.inflightCommit != nil {
142+
err = <-i.inflightCommit
143+
i.inflightCommit = nil
144+
}
145+
if err != nil {
146+
return err
147+
}
148+
result := make(chan error)
149+
i.inflightCommit = result
150+
go func(batch db.Batch) {
151+
defer batch.Close()
152+
result <- batch.Write()
153+
}(i.batch)
154+
i.batch = i.tree.ndb.db.NewBatch()
155+
i.batchSize = 0
156+
}
157+
158+
return nil
159+
}
160+
161+
// OptimisticAdd adds a TRUSTED leveldb key value pair WITHOUT verification
162+
func (i *Importer) OptimisticAdd(exportNode *ExportNode) error {
163+
if i.tree == nil {
164+
return ErrNoImport
165+
}
166+
if exportNode == nil {
167+
return errors.New("node cannot be nil")
168+
}
169+
if exportNode.Key == nil {
170+
return errors.New("node.Key cannot be nil")
171+
}
172+
if exportNode.Value == nil {
173+
return errors.New("node.Value cannot be nil")
174+
}
175+
176+
if err := i.batch.Set(i.tree.ndb.nodeKey(exportNode.Key), exportNode.Value); err != nil {
177+
return err
178+
}
179+
i.batchSize++
180+
181+
i.sendBatchIfFull()
182+
183+
return nil
184+
}
185+
120186
// Add adds an ExportNode to the import. ExportNodes must be added in the order returned by
121187
// Exporter, i.e. depth-first post-order (LRN). Nodes are periodically flushed to the database,
122188
// but the imported version is not visible until Commit() is called.
123189
func (i *Importer) Add(exportNode *ExportNode) error {
190+
// Keep the same Add(node) API but run faster optimistic import when configured
191+
if i.optimistic {
192+
return i.OptimisticAdd(exportNode)
193+
}
124194
if i.tree == nil {
125195
return ErrNoImport
126196
}
@@ -193,24 +263,28 @@ func (i *Importer) Commit() error {
193263
return ErrNoImport
194264
}
195265

196-
switch len(i.stack) {
197-
case 0:
198-
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), []byte{}); err != nil {
199-
return err
200-
}
201-
case 1:
202-
i.stack[0].nodeKey.nonce = 1
203-
if err := i.writeNode(i.stack[0]); err != nil {
204-
return err
205-
}
206-
if i.stack[0].nodeKey.version < i.version { // it means there is no update in the given version
207-
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKeyPrefix(i.stack[0].nodeKey.version)); err != nil {
266+
if i.optimistic {
267+
// All keys should be already imported
268+
} else {
269+
switch len(i.stack) {
270+
case 0:
271+
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), []byte{}); err != nil {
208272
return err
209273
}
274+
case 1:
275+
i.stack[0].nodeKey.nonce = 1
276+
if err := i.writeNode(i.stack[0]); err != nil {
277+
return err
278+
}
279+
if i.stack[0].nodeKey.version < i.version { // it means there is no update in the given version
280+
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKeyPrefix(i.stack[0].nodeKey.version)); err != nil {
281+
return err
282+
}
283+
}
284+
default:
285+
return fmt.Errorf("invalid node structure, found stack size %v when committing",
286+
len(i.stack))
210287
}
211-
default:
212-
return fmt.Errorf("invalid node structure, found stack size %v when committing",
213-
len(i.stack))
214288
}
215289

216290
err := i.batch.WriteSync()

import_test.go

+37
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,43 @@ func TestImporter_Commit_Empty(t *testing.T) {
232232
assert.EqualValues(t, 3, tree.Version())
233233
}
234234

235+
func TestImporter_OptimisticAdd(t *testing.T) {
236+
k := []byte("rawStoreKey")
237+
v := []byte("rawStoreValue")
238+
239+
testcases := map[string]struct {
240+
node *ExportNode
241+
valid bool
242+
}{
243+
"nil node": {nil, false},
244+
"trusted_valid": {&ExportNode{Key: k, Value: v, Version: 1, Height: 0}, true},
245+
"no key": {&ExportNode{Key: nil, Value: v, Version: 1, Height: 0}, false},
246+
"no value": {&ExportNode{Key: k, Value: nil, Version: 1, Height: 0}, false},
247+
// Only Key and Value used for Optimistic Add
248+
// Version and Height is ignored
249+
// further cases will be handled by Node.validate()
250+
}
251+
for desc, tc := range testcases {
252+
tc := tc // appease scopelint
253+
t.Run(desc, func(t *testing.T) {
254+
tree := NewMutableTree(dbm.NewMemDB(), 0, false, log.NewNopLogger())
255+
importer, err := tree.Import(1)
256+
require.NoError(t, err)
257+
defer importer.Close()
258+
259+
err = importer.OptimisticAdd(tc.node)
260+
if tc.valid {
261+
require.NoError(t, err)
262+
} else {
263+
if err == nil {
264+
err = importer.Commit()
265+
}
266+
require.Error(t, err)
267+
}
268+
})
269+
}
270+
}
271+
235272
func BenchmarkImport(b *testing.B) {
236273
benchmarkImport(b, 4096)
237274
}

mutable_tree.go

+6
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,12 @@ func (tree *MutableTree) Import(version int64) (*Importer, error) {
200200
return newImporter(tree, version)
201201
}
202202

203+
// OptimisticImport returns an importer for tree nodes previously exported by ImmutableTree.OptimisticExport(),
204+
// producing an identical IAVL tree. The caller must call Close() on the importer when done.
205+
func (tree *MutableTree) OptimisticImport(version int64) (*Importer, error) {
206+
return newOptimisticImporter(tree, version)
207+
}
208+
203209
// Iterate iterates over all keys of the tree. The keys and values must not be modified,
204210
// since they may point to data stored within IAVL. Returns true if stopped by callnack, false otherwise
205211
func (tree *MutableTree) Iterate(fn func(key []byte, value []byte) bool) (stopped bool, err error) {

0 commit comments

Comments
 (0)