diff --git a/bmt/bmt.go b/bmt/bmt.go index 1753e7a941..37353aeb3f 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -25,9 +25,6 @@ import ( "strings" "sync" "sync/atomic" - - "github.com/ethersphere/swarm/file" - "github.com/ethersphere/swarm/log" ) /* @@ -290,12 +287,6 @@ func newTree(segmentSize, depth int, hashfunc func() hash.Hash) *tree { } } -// SetWriter implements file.SectionWriter -func (h *Hasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { - log.Warn("Synchasher does not currently support SectionWriter chaining") - return h -} - // SectionSize implements file.SectionWriter func (h *Hasher) SectionSize() int { return h.pool.SegmentSize diff --git a/file/bmt/async.go b/file/bmt/async.go index 2b8a6331ad..4076121c13 100644 --- a/file/bmt/async.go +++ b/file/bmt/async.go @@ -22,6 +22,8 @@ import ( "sync" "github.com/ethersphere/swarm/bmt" + "github.com/ethersphere/swarm/file" + "github.com/ethersphere/swarm/log" ) // NewAsyncWriter extends Hasher with an interface for concurrent segment.GetSection() writes @@ -177,3 +179,9 @@ func (sw *AsyncHasher) SumIndexed(b []byte, length int) (s []byte) { hsh.Write(s) return hsh.Sum(b) } + +// SetWriter implements file.SectionWriter +func (sw *AsyncHasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { + log.Warn("BMT hasher does not currently support SectionWriter chaining") + return sw +} diff --git a/file/hasher/common_test.go b/file/hasher/common_test.go index bad3556420..fba203e690 100644 --- a/file/hasher/common_test.go +++ b/file/hasher/common_test.go @@ -1,7 +1,18 @@ package hasher import ( + "bytes" + "context" + "encoding/binary" + "hash" + "sync" + "testing" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethersphere/swarm/file" + "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/testutil" + "golang.org/x/crypto/sha3" ) const ( @@ -64,3 +75,201 @@ var ( func init() { testutil.Init() } + +var ( + dummyHashFunc = func(_ context.Context) file.SectionWriter { + return newDummySectionWriter(chunkSize*branches, sectionSize, sectionSize, branches) + } +) + +// simple file.SectionWriter hasher that keeps the data written to it +// for later inspection +// TODO: see if this can be replaced with the fake hasher from storage module +type dummySectionWriter struct { + sectionSize int + digestSize int + branches int + data []byte + digest []byte + size int + span []byte + summed bool + index int + writer hash.Hash + mu sync.Mutex + wg sync.WaitGroup +} + +// dummySectionWriter constructor +func newDummySectionWriter(cp int, sectionSize int, digestSize int, branches int) *dummySectionWriter { + log.Trace("creating dummy writer", "sectionsize", sectionSize, "digestsize", digestSize, "branches", branches) + return &dummySectionWriter{ + sectionSize: sectionSize, + digestSize: digestSize, + branches: branches, + data: make([]byte, cp), + writer: sha3.NewLegacyKeccak256(), + digest: make([]byte, digestSize), + } +} + +// implements file.SectionWriter +func (d *dummySectionWriter) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { + log.Error("dummySectionWriter does not support SectionWriter chaining") + return d +} + +// implements file.SectionWriter +func (d *dummySectionWriter) SeekSection(offset int) { + d.index = offset * d.SectionSize() +} + +// implements file.SectionWriter +func (d *dummySectionWriter) SetLength(length int) { + d.size = length +} + +// implements file.SectionWriter +func (d *dummySectionWriter) SetSpan(length int) { + d.span = make([]byte, 8) + binary.LittleEndian.PutUint64(d.span, uint64(length)) +} + +// implements file.SectionWriter +func (d *dummySectionWriter) Write(data []byte) (int, error) { + d.mu.Lock() + copy(d.data[d.index:], data) + d.size += len(data) + log.Trace("dummywriter write", "index", d.index, "size", d.size, "threshold", d.sectionSize*d.branches) + if d.isFull() { + d.summed = true + d.mu.Unlock() + d.sum() + } else { + d.mu.Unlock() + } + return len(data), nil +} + +// implements file.SectionWriter +func (d *dummySectionWriter) Sum(_ []byte) []byte { + log.Trace("dummy Sumcall", "size", d.size) + d.mu.Lock() + if !d.summed { + d.summed = true + d.mu.Unlock() + d.sum() + } else { + d.mu.Unlock() + } + return d.digest +} + +// invokes sum on the underlying writer +func (d *dummySectionWriter) sum() { + d.mu.Lock() + defer d.mu.Unlock() + d.writer.Write(d.span) + log.Trace("dummy sum writing span", "span", d.span) + for i := 0; i < d.size; i += d.writer.Size() { + sectionData := d.data[i : i+d.writer.Size()] + log.Trace("dummy sum write", "i", i/d.writer.Size(), "data", hexutil.Encode(sectionData), "size", d.size) + d.writer.Write(sectionData) + } + copy(d.digest, d.writer.Sum(nil)) + log.Trace("dummy sum result", "ref", hexutil.Encode(d.digest)) +} + +// implements file.SectionWriter +func (d *dummySectionWriter) Reset() { + d.mu.Lock() + defer d.mu.Unlock() + d.data = make([]byte, len(d.data)) + d.digest = make([]byte, d.digestSize) + d.size = 0 + d.summed = false + d.span = nil + d.writer.Reset() +} + +// implements file.SectionWriter +func (d *dummySectionWriter) BlockSize() int { + return d.sectionSize +} + +// implements file.SectionWriter +func (d *dummySectionWriter) SectionSize() int { + return d.sectionSize +} + +// implements file.SectionWriter +func (d *dummySectionWriter) Size() int { + return d.sectionSize +} + +// implements file.SectionWriter +func (d *dummySectionWriter) Branches() int { + return d.branches +} + +// returns true if hasher is written to the capacity limit +func (d *dummySectionWriter) isFull() bool { + return d.size == d.sectionSize*d.branches +} + +// implements file.SectionWriter +func (d *dummySectionWriter) SumIndexed(b []byte, l int) []byte { + //log.Trace("dummy sum indexed", "d", d.data[:l], "l", l, "b", b, "s", d.span) + d.writer.Write(d.span) + d.writer.Write(d.data[:l]) + return d.writer.Sum(b) +} + +// implements file.SectionWriter +func (d *dummySectionWriter) WriteIndexed(i int, b []byte) { + //log.Trace("dummy write indexed", "i", i, "b", len(b)) + copy(d.data[i*d.sectionSize:], b) +} + +// TestDummySectionWriter +func TestDummySectionWriter(t *testing.T) { + + w := newDummySectionWriter(chunkSize*2, sectionSize, sectionSize, branches) + w.Reset() + + _, data := testutil.SerialData(sectionSize*2, 255, 0) + + w.SeekSection(branches) + w.Write(data[:sectionSize]) + w.SeekSection(branches + 1) + w.Write(data[sectionSize:]) + if !bytes.Equal(w.data[chunkSize:chunkSize+sectionSize*2], data) { + t.Fatalf("Write double pos %d: expected %x, got %x", chunkSize, w.data[chunkSize:chunkSize+sectionSize*2], data) + } + + correctDigestHex := "0x52eefd0c37895a8845d4a6cf6c6b56980e448376e55eb45717663ab7b3fc8d53" + w.SetLength(chunkSize * 2) + w.SetSpan(chunkSize * 2) + digest := w.Sum(nil) + digestHex := hexutil.Encode(digest) + if digestHex != correctDigestHex { + t.Fatalf("Digest: 2xsectionSize*1; expected %s, got %s", correctDigestHex, digestHex) + } + + w = newDummySectionWriter(chunkSize*2, sectionSize*2, sectionSize*2, branches/2) + w.Reset() + w.SeekSection(branches / 2) + w.Write(data) + if !bytes.Equal(w.data[chunkSize:chunkSize+sectionSize*2], data) { + t.Fatalf("Write double pos %d: expected %x, got %x", chunkSize, w.data[chunkSize:chunkSize+sectionSize*2], data) + } + + correctDigestHex += zeroHex + w.SetLength(chunkSize * 2) + w.SetSpan(chunkSize * 2) + digest = w.Sum(nil) + digestHex = hexutil.Encode(digest) + if digestHex != correctDigestHex { + t.Fatalf("Digest 1xsectionSize*2; expected %s, got %s", correctDigestHex, digestHex) + } +} diff --git a/file/hasher/index.go b/file/hasher/index.go new file mode 100644 index 0000000000..acf8033766 --- /dev/null +++ b/file/hasher/index.go @@ -0,0 +1,79 @@ +package hasher + +import ( + "fmt" + "sync" +) + +// jobIndex keeps an index of all the existing jobs for a file hashing operation +// sorted by level +// +// it also keeps all the "top hashes", ie hashes on first data section index of every level +// these are needed in case of balanced tree results, since the hashing result would be +// lost otherwise, due to the job not having any intermediate storage of any data +type jobIndex struct { + maxLevels int + jobs []sync.Map + topHashes [][]byte + mu sync.Mutex +} + +// jobIndex constructor +func newJobIndex(maxLevels int) *jobIndex { + ji := &jobIndex{ + maxLevels: maxLevels, + } + for i := 0; i < maxLevels; i++ { + ji.jobs = append(ji.jobs, sync.Map{}) + } + return ji +} + +// implements Stringer interface +func (ji *jobIndex) String() string { + return fmt.Sprintf("%p", ji) +} + +// Add adds a job to the index at the level and data section index specified in the job +func (ji *jobIndex) Add(jb *job) { + ji.jobs[jb.level].Store(jb.dataSection, jb) +} + +// Get retrieves a job from the job index based on the level of the job and its data section index +// +// If a job for the level and section index does not exist this method returns nil +func (ji *jobIndex) Get(lvl int, section int) *job { + jb, ok := ji.jobs[lvl].Load(section) + if !ok { + return nil + } + return jb.(*job) +} + +// Delete removes a job from the job index leaving it to be garbage collected when the reference in the main code is relinquished +func (ji *jobIndex) Delete(jb *job) { + ji.jobs[jb.level].Delete(jb.dataSection) +} + +// AddTopHash should be called by a job when a hash is written to the first index of a level +// +// Since the job doesn't store any data written to it (just passing it through to the underlying writer) this is needed for the edge case of balanced trees +func (ji *jobIndex) AddTopHash(ref []byte) { + ji.mu.Lock() + defer ji.mu.Unlock() + ji.topHashes = append(ji.topHashes, ref) +} + +// GetJobHash gets the current top hash for a particular level set by AddTopHash +func (ji *jobIndex) GetTopHash(lvl int) []byte { + ji.mu.Lock() + defer ji.mu.Unlock() + return ji.topHashes[lvl-1] +} + +// GetTopHashLevel gets the level of the current topmost hash +func (ji *jobIndex) GetTopHashLevel() int { + ji.mu.Lock() + defer ji.mu.Unlock() + return len(ji.topHashes) +} diff --git a/file/hasher/job.go b/file/hasher/job.go new file mode 100644 index 0000000000..22aeb5c107 --- /dev/null +++ b/file/hasher/job.go @@ -0,0 +1,340 @@ +package hasher + +import ( + "fmt" + "sync" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethersphere/swarm/file" + "github.com/ethersphere/swarm/log" +) + +// jobUnit stores the necessary metadata for the asynchronous processing of a single chunk +type jobUnit struct { + index int + data []byte + count int +} + +// encapsulates one single intermediate chunk to be processed +type job struct { + target *target + params *treeParams + index *jobIndex + + level int // level in tree + dataSection int // data section index + cursorSection int32 // next write position in job + endCount int32 // number of writes to be written to this job (0 means write to capacity) + lastSectionSize int // data size on the last data section write + firstSectionData []byte // store first section of data written to solve the dangling chunk edge case + + writeC chan jobUnit // receives data fullyprocessed by the underlying writer + writer file.SectionWriter // underlying data processor (eg. hasher) + doneC chan struct{} // pointer to target doneC channel, set to nil in process() when closed + + mu sync.Mutex +} + +// job constructor +func newJob(params *treeParams, tgt *target, jobIndex *jobIndex, lvl int, dataSection int) *job { + jb := &job{ + params: params, + index: jobIndex, + level: lvl, + dataSection: dataSection, + writeC: make(chan jobUnit), + target: tgt, + doneC: nil, + } + if jb.index == nil { + jb.index = newJobIndex(9) + } + targetLevel := tgt.Level() + if targetLevel == 0 { + log.Trace("target not set", "level", lvl) + jb.doneC = tgt.doneC + + } else { + log.Trace("target set", "level", lvl, "targetLevel", targetLevel) + targetCount := tgt.Count() + jb.endCount = int32(jb.targetCountToEndCount(targetCount)) + } + log.Trace("target count", "level", lvl, "count", tgt.Count()) + + jb.index.Add(jb) + return jb +} + +// starts the asynchronous chunk processor that dispatches data to the underlying writer +func (jb *job) start() { + jb.writer = jb.params.GetWriter() + go jb.process() +} + +// implements Stringer interface +func (jb *job) String() string { + return fmt.Sprintf("job: l:%d,s:%d", jb.level, jb.dataSection) +} + +// atomically increments the write counter of the job +func (jb *job) inc() int { + return int(atomic.AddInt32(&jb.cursorSection, 1)) +} + +// atomically returns the write counter of the job +func (jb *job) count() int { + return int(atomic.LoadInt32(&jb.cursorSection)) +} + +// size returns the byte size of the span the job represents +// +// If job is last index in a level and writes have been finalized, it will return the target size. Otherwise, regardless of job index, it will return the size according to the current write count +// +// TODO: returning expected size in one case and actual size in another can lead to confusion +func (jb *job) size() int { + jb.mu.Lock() + count := int(jb.cursorSection) //jb.count() + endCount := int(jb.endCount) //int(atomic.LoadInt32(&jb.endCount)) + jb.mu.Unlock() + if endCount%jb.params.Branches == 0 { + return count * jb.params.SectionSize * jb.params.Spans[jb.level] + } + log.Trace("size", "sections", jb.target.sections, "size", jb.target.Size(), "endcount", endCount, "level", jb.level) + return jb.target.Size() % (jb.params.Spans[jb.level] * jb.params.SectionSize * jb.params.Branches) +} + +// add data to job +// +// Note: Does no checking for data length or index validity +// +// TODO: rename index param not to confuse with index object +func (jb *job) write(idx int, data []byte) { + + jb.inc() + + // if a write is received at the first datasection of a level we need to store this hash + // in case of a balanced tree and we need to send it to resultC later + // at the time of hasing of a balanced tree we have no way of knowing for sure whether + // that is the end of the job or not + if jb.dataSection == 0 && idx == 0 { + topHashLevel := jb.index.GetTopHashLevel() + if topHashLevel < jb.level { + log.Trace("have tophash", "level", jb.level, "ref", hexutil.Encode(data)) + jb.index.AddTopHash(data) + } + } + jb.writeC <- jobUnit{ + index: idx, + data: data, + } +} + +// process asynchronously handles chunk processing to the underlying writer +// +// runs in loop until: +// +// - sectionSize number of job writes have occurred (one full chunk) +// - data write is finalized and targetcount for this chunk was already reached +// - data write is finalized and targetcount is reached on a subsequent job write +func (jb *job) process() { + + log.Trace("starting job process", "level", jb.level, "sec", jb.dataSection, "target", jb.target) + + var processCount int + defer jb.destroy() + + // is set when data write is finished, AND + // the final data section falls within the span of this job + // if not, loop will only exit on Branches writes +OUTER: + for { + select { + + // enter here if new data is written to the job + // TODO: Error if calculated write count exceed chunk + case entry := <-jb.writeC: + + // split the contents to fit the underlying SectionWriter + entrySections := len(entry.data) / jb.writer.SectionSize() + jb.mu.Lock() + endCount := int(jb.endCount) + oldProcessCount := processCount + processCount += entrySections + jb.mu.Unlock() + if entry.index == 0 { + jb.firstSectionData = entry.data + } + log.Trace("job entry", "datasection", jb.dataSection, "num sections", entrySections, "level", jb.level, "processCount", oldProcessCount, "endcount", endCount, "index", entry.index, "data", hexutil.Encode(entry.data)) + + // TODO: this write is superfluous when the received data is the root hash + var offset int + for i := 0; i < entrySections; i++ { + idx := entry.index + i + data := entry.data[offset : offset+jb.writer.SectionSize()] + log.Trace("job write", "datasection", jb.dataSection, "level", jb.level, "processCount", oldProcessCount+i, "endcount", endCount, "index", entry.index+i, "data", hexutil.Encode(data)) + jb.writer.WriteIndexed(idx, data) + offset += jb.writer.SectionSize() + } + + // since newcount is incremented above it can only equal endcount if this has been set in the case below, + // which means data write has been completed + // otherwise if we reached the chunk limit we also continue to hashing + if processCount == endCount { + log.Trace("quitting writec - endcount", "c", processCount, "level", jb.level) + break OUTER + } + if processCount == jb.writer.Branches() { + log.Trace("quitting writec - branches") + break OUTER + } + + // enter here if data writes have been completed + // TODO: this case currently executes for all cycles after data write is complete for which writes to this job do not happen. perhaps it can be improved + case <-jb.doneC: + log.Trace("doneloop enter") + jb.mu.Lock() + jb.doneC = nil + log.Trace("doneloop", "level", jb.level, "processCount", processCount, "endcount", jb.endCount) + //count := jb.count() + + // if the target count falls within the span of this job + // set the endcount so we know we have to do extra calculations for + // determining span in case of unbalanced tree + targetCount := jb.target.Count() + jb.endCount = int32(jb.targetCountToEndCount(targetCount)) + log.Trace("doneloop done", "level", jb.level, "targetcount", jb.target.Count(), "endcount", jb.endCount) + + // if we have reached the end count for this chunk, we proceed to hashing + // this case is important when write to the level happen after this goroutine + // registers that data writes have been completed + if processCount > 0 && processCount == int(jb.endCount) { + log.Trace("quitting donec", "level", jb.level, "processcount", processCount) + jb.mu.Unlock() + break OUTER + } + jb.mu.Unlock() + } + } + + jb.sum() +} + +// sum retrieves the sum generated by the underlying writer and writes it to the corresponding section in the level above +func (jb *job) sum() { + + targetLevel := jb.target.Level() + if targetLevel == jb.level { + jb.target.resultC <- jb.index.GetTopHash(jb.level) + return + } + + // get the size of the span and execute the hash digest of the content + size := jb.size() + refSize := jb.count() * jb.params.SectionSize + jb.writer.SetSpan(size) + log.Trace("job sum", "count", jb.count(), "refsize", refSize, "size", size, "datasection", jb.dataSection, "level", jb.level, "targetlevel", targetLevel, "endcount", jb.endCount) + ref := jb.writer.SumIndexed(nil, refSize) + + // endCount > 0 means this is the last chunk on the level + // the hash from the level below the target level will be the result + belowRootLevel := targetLevel - 1 + if jb.endCount > 0 && jb.level == belowRootLevel { + jb.target.resultC <- ref + return + } + + // retrieve the parent and the corresponding section in it to write to + parent := jb.parent() + log.Trace("have parent", "level", jb.level, "jb p", fmt.Sprintf("%p", jb), "jbp p", fmt.Sprintf("%p", parent)) + nextLevel := jb.level + 1 + parentSection := dataSectionToLevelSection(jb.params, nextLevel, jb.dataSection) + + // in the event that we have a balanced tree and a chunk with single reference below the target level + // we move the single reference up to the penultimate level + if jb.endCount == 1 { + ref = jb.firstSectionData + for parent.level < belowRootLevel { + log.Trace("parent write skip", "level", parent.level) + oldParent := parent + parent = parent.parent() + oldParent.destroy() + nextLevel += 1 + parentSection = dataSectionToLevelSection(jb.params, nextLevel, jb.dataSection) + } + } + parent.write(parentSection, ref) + +} + +// determine whether the given data section count falls within the span of the current job +func (jb *job) targetWithinJob(targetSection int) (int, bool) { + var endIndex int + var ok bool + + // span one level above equals the data size of 128 units of one section on this level + // using the span table saves one multiplication + //dataBoundary := dataSectionToLevelBoundary(jb.params, jb.level, jb.dataSection) + dataBoundary := dataSectionToLevelBoundary(jb.params, jb.level, jb.dataSection) + upperLimit := dataBoundary + jb.params.Spans[jb.level+1] + + // the data section is the data section index where the span of this job starts + if targetSection >= dataBoundary && targetSection < upperLimit { + + // data section index must be divided by corresponding section size on the job's level + // then wrap on branch period to find the correct section within this job + endIndex = (targetSection / jb.params.Spans[jb.level]) % jb.params.Branches + + ok = true + } + return endIndex, ok +} + +// If last data index falls within the span, return the appropriate end count for the level +// +// Otherwise return 0, which means "job write until limit" +func (jb *job) targetCountToEndCount(targetCount int) int { + endIndex, ok := jb.targetWithinJob(targetCount - 1) + if !ok { + return 0 + } + return endIndex + 1 +} + +// Returns the parent job of the receiver job +// +// A new parent job is created if none exists for the slot +// +// TODO: consider renaming to getOrCreateParent +func (jb *job) parent() *job { + jb.index.mu.Lock() + defer jb.index.mu.Unlock() + newLevel := jb.level + 1 + // Truncate to even quotient which is the actual logarithmic boundary of the data section under the span + newDataSection := dataSectionToLevelBoundary(jb.params, jb.level+1, jb.dataSection) + parent := jb.index.Get(newLevel, newDataSection) + if parent != nil { + return parent + } + jbp := newJob(jb.params, jb.target, jb.index, jb.level+1, newDataSection) + jbp.start() + return jbp +} + +// Next creates the job for the next data section span on the same level as the receiver job +// +// This is only meant to be called once for each job, consecutive calls will overwrite index with new empty job +func (jb *job) Next() *job { + jbn := newJob(jb.params, jb.target, jb.index, jb.level, jb.dataSection+jb.params.Spans[jb.level+1]) + jbn.start() + return jbn +} + +// cleans up the job; reset hasher and remove pointer to job from index +func (jb *job) destroy() { + if jb.writer != nil { + jb.params.PutWriter(jb.writer) + } + jb.index.Delete(jb) +} diff --git a/file/hasher/job_test.go b/file/hasher/job_test.go new file mode 100644 index 0000000000..1ed4e20d48 --- /dev/null +++ b/file/hasher/job_test.go @@ -0,0 +1,620 @@ +package hasher + +import ( + "context" + "fmt" + "math/rand" + "strconv" + "strings" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethersphere/swarm/bmt" + "github.com/ethersphere/swarm/file" + "github.com/ethersphere/swarm/file/testutillocal" + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/testutil" + "golang.org/x/crypto/sha3" +) + +const ( + zeroHex = "0000000000000000000000000000000000000000000000000000000000000000" +) + +// TestTreeParams verifies that params are set correctly by the param constructor +func TestTreeParams(t *testing.T) { + + params := newTreeParams(dummyHashFunc) + + if params.SectionSize != 32 { + t.Fatalf("section: expected %d, got %d", sectionSize, params.SectionSize) + } + + if params.Branches != 128 { + t.Fatalf("branches: expected %d, got %d", branches, params.SectionSize) + } + + if params.Spans[2] != branches*branches { + t.Fatalf("span %d: expected %d, got %d", 2, branches*branches, params.Spans[1]) + } + +} + +// TestTarget verifies that params are set correctly by the target constructor +func TestTarget(t *testing.T) { + + tgt := newTarget() + tgt.Set(32, 1, 2) + + if tgt.size != 32 { + t.Fatalf("target size expected %d, got %d", 32, tgt.size) + } + + if tgt.sections != 1 { + t.Fatalf("target sections expected %d, got %d", 1, tgt.sections) + } + + if tgt.level != 2 { + t.Fatalf("target level expected %d, got %d", 2, tgt.level) + } +} + +// TestJobTargetWithinJobDefault verifies the calculation of whether a final data section index +// falls within a particular job's span without regard to differing SectionSize +func TestJobTargetWithinDefault(t *testing.T) { + params := newTreeParams(dummyHashFunc) + index := newJobIndex(9) + tgt := newTarget() + + jb := newJob(params, tgt, index, 1, branches*branches) + defer jb.destroy() + + finalSize := chunkSize*branches + chunkSize*2 + finalCount := dataSizeToSectionCount(finalSize, sectionSize) + log.Trace("within test", "size", finalSize, "count", finalCount) + c, ok := jb.targetWithinJob(finalCount - 1) + if !ok { + t.Fatalf("target %d within %d: expected true", finalCount, jb.level) + } + if c != 1 { + t.Fatalf("target %d within %d: expected %d, got %d", finalCount, jb.level, 2, c) + } +} + +// TestJobTargetWithinDifferentSections does the same as TestTargetWithinJobDefault but +// with SectionSize/Branches settings differeing between client target and underlying writer +func TestJobTargetWithinDifferentSections(t *testing.T) { + dummyHashDoubleFunc := func(_ context.Context) file.SectionWriter { + return newDummySectionWriter(chunkSize, sectionSize*2, sectionSize*2, branches/2) + } + params := newTreeParams(dummyHashDoubleFunc) + index := newJobIndex(9) + tgt := newTarget() + + //jb := newJob(params, tgt, index, 1, branches*branches) + jb := newJob(params, tgt, index, 1, 0) + defer jb.destroy() + + //finalSize := chunkSize*branches + chunkSize*2 + finalSize := chunkSize + finalCount := dataSizeToSectionCount(finalSize, sectionSize) + log.Trace("within test", "size", finalSize, "count", finalCount) + c, ok := jb.targetWithinJob(finalCount - 1) + if !ok { + t.Fatalf("target %d within %d: expected true", finalCount, jb.level) + } + if c != 1 { + t.Fatalf("target %d within %d: expected %d, got %d", finalCount, jb.level, 1, c) + } +} + +// TestNewJob verifies that a job is initialized with the correct values +func TestNewJob(t *testing.T) { + + params := newTreeParams(dummyHashFunc) + params.Debug = true + + tgt := newTarget() + jb := newJob(params, tgt, nil, 1, branches*branches+1) + if jb.level != 1 { + t.Fatalf("job level expected 1, got %d", jb.level) + } + if jb.dataSection != branches*branches+1 { + t.Fatalf("datasectionindex: expected %d, got %d", branches+1, jb.dataSection) + } + tgt.Set(0, 0, 0) + jb.destroy() +} + +// TestJobSize verifies the data size calculation used for calculating the span of data +// under a particular level reference +// it tests both a balanced and an unbalanced tree +func TestJobSize(t *testing.T) { + params := newTreeParams(dummyHashFunc) + params.Debug = true + index := newJobIndex(9) + + tgt := newTarget() + jb := newJob(params, tgt, index, 3, 0) + jb.cursorSection = 1 + jb.endCount = 1 + size := chunkSize*branches + chunkSize + sections := dataSizeToSectionIndex(size, sectionSize) + 1 + tgt.Set(size, sections, 3) + jobSize := jb.size() + if jobSize != size { + t.Fatalf("job size: expected %d, got %d", size, jobSize) + } + jb.destroy() + + tgt = newTarget() + jb = newJob(params, tgt, index, 3, 0) + jb.cursorSection = 1 + jb.endCount = 1 + size = chunkSize * branches * branches + sections = dataSizeToSectionIndex(size, sectionSize) + 1 + tgt.Set(size, sections, 3) + jobSize = jb.size() + if jobSize != size { + t.Fatalf("job size: expected %d, got %d", size, jobSize) + } + jb.destroy() + +} + +// TestJobTarget verifies that the underlying calculation for determining whether +// a data section index is within a level's span is correct +func TestJobTarget(t *testing.T) { + tgt := newTarget() + params := newTreeParams(dummyHashFunc) + params.Debug = true + index := newJobIndex(9) + + jb := newJob(params, tgt, index, 1, branches*branches) + + // this is less than chunksize * 128 + // it will not be in the job span + finalSize := chunkSize + sectionSize + 1 + finalSection := dataSizeToSectionIndex(finalSize, sectionSize) + c, ok := jb.targetWithinJob(finalSection) + if ok { + t.Fatalf("targetwithinjob: expected false") + } + jb.destroy() + + // chunkSize*128+chunkSize*2 (532480) is within chunksize*128 (524288) and chunksize*128*2 (1048576) + // it will be within the job span + finalSize = chunkSize*branches + chunkSize*2 + finalSection = dataSizeToSectionIndex(finalSize, sectionSize) + c, ok = jb.targetWithinJob(finalSection) + if !ok { + t.Fatalf("targetwithinjob section %d: expected true", branches*branches) + } + if c != 1 { + t.Fatalf("targetwithinjob section %d: expected %d, got %d", branches*branches, 1, c) + } + c = jb.targetCountToEndCount(finalSection + 1) + if c != 2 { + t.Fatalf("targetcounttoendcount section %d: expected %d, got %d", branches*branches, 2, c) + } + jb.destroy() +} + +// TestJobIndex verifies that the job constructor adds the job to the job index +// and removes it on job destruction +func TestJobIndex(t *testing.T) { + tgt := newTarget() + params := newTreeParams(dummyHashFunc) + + jb := newJob(params, tgt, nil, 1, branches) + jobIndex := jb.index + jbGot := jobIndex.Get(1, branches) + if jb != jbGot { + t.Fatalf("jbIndex get: expect %p, got %p", jb, jbGot) + } + jbGot.destroy() + if jobIndex.Get(1, branches) != nil { + t.Fatalf("jbIndex delete: expected nil") + } +} + +// TestJobGetNext verifies that the new job constructed through the job.Next() method +// has the correct level and data section index +func TestJobGetNext(t *testing.T) { + tgt := newTarget() + params := newTreeParams(dummyHashFunc) + params.Debug = true + + jb := newJob(params, tgt, nil, 1, branches*branches) + jbn := jb.Next() + if jbn == nil { + t.Fatalf("parent: nil") + } + if jbn.level != 1 { + t.Fatalf("nextjob level: expected %d, got %d", 2, jbn.level) + } + if jbn.dataSection != jb.dataSection+branches*branches { + t.Fatalf("nextjob section: expected %d, got %d", jb.dataSection+branches*branches, jbn.dataSection) + } +} + +// TestJobWriteTwoAndFinish writes two references to a job and sets the job target to two chunks +// it verifies that the job count after the writes is two, and the hash is correct +func TestJobWriteTwoAndFinish(t *testing.T) { + + tgt := newTarget() + params := newTreeParams(dummyHashFunc) + + jb := newJob(params, tgt, nil, 1, 0) + jb.start() + _, data := testutil.SerialData(sectionSize*2, 255, 0) + jb.write(0, data[:sectionSize]) + jb.write(1, data[sectionSize:]) + + finalSize := chunkSize * 2 + finalSection := dataSizeToSectionIndex(finalSize, sectionSize) + tgt.Set(finalSize, finalSection-1, 2) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*199) + defer cancel() + select { + case ref := <-tgt.Done(): + correctRefHex := "0xe1553e1a3a6b73f96e6fc48318895e401e7db2972962ee934633fa8b3eaaf78b" + refHex := hexutil.Encode(ref) + if refHex != correctRefHex { + t.Fatalf("job write two and finish: expected %s, got %s", correctRefHex, refHex) + } + case <-ctx.Done(): + t.Fatalf("timeout: %v", ctx.Err()) + } + + if jb.count() != 2 { + t.Fatalf("jobcount: expected %d, got %d", 2, jb.count()) + } +} + +// TestJobGetParent verifies that the parent returned from two jobs' parent() calls +// that are within the same span as the parent chunk of references is the same +// BUG: not guaranteed to return same parent when run with eg -count 100 +func TestJobGetParent(t *testing.T) { + tgt := newTarget() + params := newTreeParams(dummyHashFunc) + + jb := newJob(params, tgt, nil, 1, branches*branches) + jb.start() + jbp := jb.parent() + if jbp == nil { + t.Fatalf("parent: nil") + } + if jbp.level != 2 { + t.Fatalf("parent level: expected %d, got %d", 2, jbp.level) + } + if jbp.dataSection != 0 { + t.Fatalf("parent data section: expected %d, got %d", 0, jbp.dataSection) + } + jbGot := jb.index.Get(2, 0) + if jbGot == nil { + t.Fatalf("index get: nil") + } + + jbNext := jb.Next() + jbpNext := jbNext.parent() + if jbpNext != jbp { + t.Fatalf("next parent: expected %p, got %p", jbp, jbpNext) + } +} + +// TestJobWriteParentSection verifies that a data write translates to a write +// in the correct section of its parent +func TestJobWriteParentSection(t *testing.T) { + tgt := newTarget() + params := newTreeParams(dummyHashFunc) + index := newJobIndex(9) + + jb := newJob(params, tgt, index, 1, 0) + jbn := jb.Next() + _, data := testutil.SerialData(sectionSize*2, 255, 0) + jbn.write(0, data[:sectionSize]) + jbn.write(1, data[sectionSize:]) + + finalSize := chunkSize*branches + chunkSize*2 + finalSection := dataSizeToSectionIndex(finalSize, sectionSize) + tgt.Set(finalSize, finalSection, 3) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + select { + case <-tgt.Done(): + t.Fatalf("unexpected done") + case <-ctx.Done(): + } + jbnp := jbn.parent() + if jbnp.count() != 1 { + t.Fatalf("parent count: expected %d, got %d", 1, jbnp.count()) + } + correctRefHex := "0xe1553e1a3a6b73f96e6fc48318895e401e7db2972962ee934633fa8b3eaaf78b" + + // extract data in section 2 from the writer + // TODO: overload writer to provide a get method to extract data to improve clarity + w := jbnp.writer.(*dummySectionWriter) + w.mu.Lock() + parentRef := w.data[32:64] + w.mu.Unlock() + parentRefHex := hexutil.Encode(parentRef) + if parentRefHex != correctRefHex { + t.Fatalf("parent data: expected %s, got %s", correctRefHex, parentRefHex) + } +} + +// TestJobWriteFull verifies the hashing result of the write of a balanced tree +// where the simulated tree is chunkSize*branches worth of data +func TestJobWriteFull(t *testing.T) { + + tgt := newTarget() + params := newTreeParams(dummyHashFunc) + + jb := newJob(params, tgt, nil, 1, 0) + jb.start() + _, data := testutil.SerialData(chunkSize, 255, 0) + for i := 0; i < chunkSize; i += sectionSize { + jb.write(i/sectionSize, data[i:i+sectionSize]) + } + + tgt.Set(chunkSize, branches, 2) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + select { + case ref := <-tgt.Done(): + correctRefHex := "0x1ed31ae32fe570b69b01f800fbdec1f17b7ea6f0348216d6d79df91ddf28344e" + refHex := hexutil.Encode(ref) + if refHex != correctRefHex { + t.Fatalf("job write full: expected %s, got %s", correctRefHex, refHex) + } + case <-ctx.Done(): + t.Fatalf("timeout: %v", ctx.Err()) + } + if jb.count() != branches { + t.Fatalf("jobcount: expected %d, got %d", 32, jb.count()) + } +} + +// TestJobWriteSpan uses the bmt asynchronous hasher +// it verifies that a result can be attained at chunkSize+sectionSize*2 references +// which translates to chunkSize*branches+chunkSize*2 bytes worth of data +func TestJobWriteSpan(t *testing.T) { + + tgt := newTarget() + hashFunc := testutillocal.NewBMTHasherFunc(0) + params := newTreeParams(hashFunc) + + jb := newJob(params, tgt, nil, 1, 0) + jb.start() + _, data := testutil.SerialData(chunkSize+sectionSize*2, 255, 0) + + for i := 0; i < chunkSize; i += sectionSize { + jb.write(i/sectionSize, data[i:i+sectionSize]) + } + jbn := jb.Next() + jbn.write(0, data[chunkSize:chunkSize+sectionSize]) + jbn.write(1, data[chunkSize+sectionSize:]) + finalSize := chunkSize*branches + chunkSize*2 + finalSection := dataSizeToSectionIndex(finalSize, sectionSize) + tgt.Set(finalSize, finalSection, 3) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1000) + defer cancel() + select { + case ref := <-tgt.Done(): + // TODO: double check that this hash if correct!! + refCorrectHex := "0xee56134cab34a5a612648dcc22d88b7cb543081bd144906dfc4fa93802c9addf" + refHex := hexutil.Encode(ref) + if refHex != refCorrectHex { + t.Fatalf("writespan sequential: expected %s, got %s", refCorrectHex, refHex) + } + case <-ctx.Done(): + t.Fatalf("timeout: %v", ctx.Err()) + } + + sz := jb.size() + if sz != chunkSize*branches { + t.Fatalf("job 1 size: expected %d, got %d", chunkSize, sz) + } + + sz = jbn.size() + if sz != chunkSize*2 { + t.Fatalf("job 2 size: expected %d, got %d", sectionSize, sz) + } +} + +// TestJobWriteSpanShuffle does the same as TestJobWriteSpan but +// shuffles the indices of the first chunk write +// verifying that sequential use of the underlying hasher is not required +func TestJobWriteSpanShuffle(t *testing.T) { + + tgt := newTarget() + hashFunc := testutillocal.NewBMTHasherFunc(0) + params := newTreeParams(hashFunc) + + jb := newJob(params, tgt, nil, 1, 0) + jb.start() + _, data := testutil.SerialData(chunkSize+sectionSize*2, 255, 0) + + var idxs []int + for i := 0; i < branches; i++ { + idxs = append(idxs, i) + } + rand.Shuffle(branches, func(i int, j int) { + idxs[i], idxs[j] = idxs[j], idxs[i] + }) + for _, idx := range idxs { + jb.write(idx, data[idx*sectionSize:idx*sectionSize+sectionSize]) + } + + jbn := jb.Next() + jbn.write(0, data[chunkSize:chunkSize+sectionSize]) + jbn.write(1, data[chunkSize+sectionSize:]) + finalSize := chunkSize*branches + chunkSize*2 + finalSection := dataSizeToSectionIndex(finalSize, sectionSize) + tgt.Set(finalSize, finalSection, 3) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + defer cancel() + select { + case ref := <-tgt.Done(): + refCorrectHex := "0xee56134cab34a5a612648dcc22d88b7cb543081bd144906dfc4fa93802c9addf" + refHex := hexutil.Encode(ref) + jbparent := jb.parent() + jbnparent := jbn.parent() + log.Info("succeeding", "jb count", jb.count(), "jbn count", jbn.count(), "jb parent count", jbparent.count(), "jbn parent count", jbnparent.count()) + if refHex != refCorrectHex { + t.Fatalf("writespan sequential: expected %s, got %s", refCorrectHex, refHex) + } + case <-ctx.Done(): + + jbparent := jb.parent() + jbnparent := jbn.parent() + log.Error("failing", "jb count", jb.count(), "jbn count", jbn.count(), "jb parent count", jbparent.count(), "jbn parent count", jbnparent.count(), "jb parent p", fmt.Sprintf("%p", jbparent), "jbn parent p", fmt.Sprintf("%p", jbnparent)) + t.Fatalf("timeout: %v", ctx.Err()) + } + + sz := jb.size() + if sz != chunkSize*branches { + t.Fatalf("job size: expected %d, got %d", chunkSize*branches, sz) + } + + sz = jbn.size() + if sz != chunkSize*2 { + t.Fatalf("job size: expected %d, got %d", chunkSize*branches, sz) + } +} + +// TestVectors executes the barebones functionality of the hasher +// and verifies against source of truth results generated from the reference hasher +// for the same data +// TODO: vet dynamically against the referencefilehasher instead of expect vector +func TestJobVector(t *testing.T) { + poolSync := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, bmt.PoolSize) + dataHash := bmt.New(poolSync) + hashFunc := testutillocal.NewBMTHasherFunc(0) + params := newTreeParams(hashFunc) + var mismatch int + + for i := start; i < end; i++ { + tgt := newTarget() + dataLength := dataLengths[i] + _, data := testutil.SerialData(dataLength, 255, 0) + jb := newJob(params, tgt, nil, 1, 0) + jb.start() + count := 0 + log.Info("test vector", "length", dataLength) + for i := 0; i < dataLength; i += chunkSize { + ie := i + chunkSize + if ie > dataLength { + ie = dataLength + } + writeSize := ie - i + dataHash.Reset() + dataHash.SetSpan(writeSize) + c, err := dataHash.Write(data[i:ie]) + if err != nil { + jb.destroy() + t.Fatalf("data ref fail: %v", err) + } + if c != ie-i { + jb.destroy() + t.Fatalf("data ref short write: expect %d, got %d", ie-i, c) + } + ref := dataHash.Sum(nil) //, writeSize) + log.Debug("data ref", "i", i, "ie", ie, "data", hexutil.Encode(ref)) + jb.write(count, ref) + count += 1 + if ie%(chunkSize*branches) == 0 { + jb = jb.Next() + count = 0 + } + } + dataSections := dataSizeToSectionIndex(dataLength, params.SectionSize) + tgt.Set(dataLength, dataSections, getLevelsFromLength(dataLength, params.SectionSize, params.Branches)) + eq := true + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1000) + defer cancel() + select { + case ref := <-tgt.Done(): + refCorrectHex := "0x" + expected[i] + refHex := hexutil.Encode(ref) + if refHex != refCorrectHex { + mismatch++ + eq = false + } + t.Logf("[%7d+%4d]\t%v\tref: %x\texpect: %s", dataLength/chunkSize, dataLength%chunkSize, eq, ref, expected[i]) + case <-ctx.Done(): + t.Fatalf("timeout: %v", ctx.Err()) + } + + } + if mismatch > 0 { + t.Fatalf("mismatches: %d/%d", mismatch, end-start) + } +} + +// BenchmarkVector generates benchmarks that are comparable to the pyramid hasher +func BenchmarkJob(b *testing.B) { + for i := start; i < end; i++ { + b.Run(fmt.Sprintf("%d/%d", i, dataLengths[i]), benchmarkJob) + } +} + +func benchmarkJob(b *testing.B) { + params := strings.Split(b.Name(), "/") + dataLengthParam, err := strconv.ParseInt(params[2], 10, 64) + if err != nil { + b.Fatal(err) + } + dataLength := int(dataLengthParam) + + poolSync := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, bmt.PoolSize) + dataHash := bmt.New(poolSync) + hashFunc := testutillocal.NewBMTHasherFunc(0) + treeParams := newTreeParams(hashFunc) + _, data := testutil.SerialData(dataLength, 255, 0) + + for j := 0; j < b.N; j++ { + tgt := newTarget() + jb := newJob(treeParams, tgt, nil, 1, 0) + jb.start() + count := 0 + for i := 0; i < dataLength; i += chunkSize { + ie := i + chunkSize + if ie > dataLength { + ie = dataLength + } + dataHash.Reset() + c, err := dataHash.Write(data[i:ie]) + if err != nil { + jb.destroy() + b.Fatalf("data ref fail: %v", err) + } + if c != ie-i { + jb.destroy() + b.Fatalf("data ref short write: expect %d, got %d", ie-i, c) + } + ref := dataHash.Sum(nil) + jb.write(count, ref) + count += 1 + if ie%(chunkSize*branches) == 0 { + jb = jb.Next() + count = 0 + } + } + dataSections := dataSizeToSectionIndex(dataLength, treeParams.SectionSize) + tgt.Set(dataLength, dataSections, getLevelsFromLength(dataLength, treeParams.SectionSize, treeParams.Branches)) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1000) + defer cancel() + select { + case <-tgt.Done(): + case <-ctx.Done(): + b.Fatalf("timeout: %v", ctx.Err()) + } + } +} diff --git a/file/hasher/reference_test.go b/file/hasher/reference_test.go index d4deef5c0b..4d4c4e974e 100644 --- a/file/hasher/reference_test.go +++ b/file/hasher/reference_test.go @@ -15,6 +15,27 @@ import ( "golang.org/x/crypto/sha3" ) +// referenceBmtWrapper encapsulates the bmt hasher in order to implement the SectionWriter interface +type referenceBmtWrapper struct { + *bmt.Hasher +} + +// implements file.SectionWriter +func (r *referenceBmtWrapper) SetWriter(hashFunc file.SectionWriterFunc) file.SectionWriter { + log.Warn("BMT hasher does not currently support SectionWriter chaining") + return r +} + +// implements file.SectionWriter +func (r *referenceBmtWrapper) SumIndexed(b []byte, _ int) []byte { + return r.Sum(b) +} + +// implements file.SectionWriter +func (r *referenceBmtWrapper) WriteIndexed(_ int, b []byte) { + r.Write(b) +} + // TestManualDanglingChunk is a test script explicitly hashing and writing every individual level in the dangling chunk edge case // we use a balanced tree with data size of chunkSize*branches, and a single chunk of data // this case is chosen because it produces the wrong result in the pyramid hasher at the time of writing (master commit hash 4928d989ebd0854d993c10c194e61a5a5455e4f9) @@ -90,7 +111,7 @@ func TestReferenceHasherVector(t *testing.T) { hashFunc := func(_ context.Context) file.SectionWriter { pool := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, bmt.PoolSize) - return bmt.New(pool) + return &referenceBmtWrapper{Hasher: bmt.New(pool)} } params := newTreeParams(hashFunc) var mismatch int @@ -128,7 +149,7 @@ func benchmarkReferenceHasher(b *testing.B) { } hashFunc := func(_ context.Context) file.SectionWriter { pool := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, bmt.PoolSize) - return bmt.New(pool) + return &referenceBmtWrapper{Hasher: bmt.New(pool)} } params := newTreeParams(hashFunc) b.ResetTimer() diff --git a/file/hasher/target.go b/file/hasher/target.go new file mode 100644 index 0000000000..a6194308f3 --- /dev/null +++ b/file/hasher/target.go @@ -0,0 +1,70 @@ +package hasher + +import ( + "sync" + + "github.com/ethersphere/swarm/log" +) + +// target is passed to a job to determine at which data lengths and levels a job should terminate +type target struct { + size int32 // bytes written + sections int32 // sections written + level int32 // target level calculated from bytes written against branching factor and sector size + resultC chan []byte // channel to receive root hash + doneC chan struct{} // when this channel is closed all jobs will calculate their end write count + mu sync.Mutex +} + +// target constructor +func newTarget() *target { + return &target{ + resultC: make(chan []byte), + doneC: make(chan struct{}), + } +} + +// Set is called when the final length of the data to be written is known +// +// TODO: method can be simplified to calculate sections and level internally +func (t *target) Set(size int, sections int, level int) { + t.mu.Lock() + defer t.mu.Unlock() + t.size = int32(size) + t.sections = int32(sections) + t.level = int32(level) + log.Trace("target set", "size", t.size, "section", t.sections, "level", t.level) + close(t.doneC) +} + +// Count returns the total section count for the target +// +// it should only be called after Set() +func (t *target) Count() int { + t.mu.Lock() + defer t.mu.Unlock() + return int(t.sections) + 1 +} + +// Level returns the level of the target +// +// it should only be called after Set() +func (t *target) Level() int { + t.mu.Lock() + defer t.mu.Unlock() + return int(t.level) +} + +// Size returns the byte count for the target +// +// it should only be called after Set() +func (t *target) Size() int { + t.mu.Lock() + defer t.mu.Unlock() + return int(t.size) +} + +// Done returns the channel in which the root hash will be sent +func (t *target) Done() <-chan []byte { + return t.resultC +} diff --git a/file/hasher/util.go b/file/hasher/util.go index 141fd1d114..e55f78fa5d 100644 --- a/file/hasher/util.go +++ b/file/hasher/util.go @@ -4,8 +4,9 @@ import ( "math" ) -// TODO: level 0 should be SectionSize() not Branches() // generates a dictionary of maximum span lengths per level represented by one SectionSize() of data +// +// TODO: level 0 should be SectionSize() not Branches() func generateSpanSizes(branches int, levels int) []int { spans := make([]int, levels) span := 1 @@ -16,9 +17,9 @@ func generateSpanSizes(branches int, levels int) []int { return spans } +// calculate the last level index which a particular data section count will result in. The returned level will be the level of the root hash +// // TODO: use params instead of sectionSize, branches -// calculate the last level index which a particular data section count will result in. -// the returned level will be the level of the root hash func getLevelsFromLength(l int, sectionSize int, branches int) int { if l == 0 { return 0 @@ -29,3 +30,29 @@ func getLevelsFromLength(l int, sectionSize int, branches int) int { return int(math.Log(float64(c))/math.Log(float64(branches)) + 1) } + +// calculates the section index of the given byte size +func dataSizeToSectionIndex(length int, sectionSize int) int { + return (length - 1) / sectionSize +} + +// calculates the section count of the given byte size +func dataSizeToSectionCount(length int, sectionSize int) int { + return dataSizeToSectionIndex(length, sectionSize) + 1 +} + +// calculates the corresponding level section for a data section +func dataSectionToLevelSection(p *treeParams, lvl int, sections int) int { + span := p.Spans[lvl] + return sections / span +} + +// calculates the lower data section boundary of a level for which a data section is contained +// +// the higher level use is to determine whether the final data section written falls within a certain level's span +func dataSectionToLevelBoundary(p *treeParams, lvl int, section int) int { + span := p.Spans[lvl+1] + spans := section / span + spanBytes := spans * span + return spanBytes +} diff --git a/file/testutillocal/hash.go b/file/testutillocal/hash.go new file mode 100644 index 0000000000..f1decde683 --- /dev/null +++ b/file/testutillocal/hash.go @@ -0,0 +1,27 @@ +package testutillocal + +import ( + "context" + + "github.com/ethersphere/swarm/bmt" + "github.com/ethersphere/swarm/file" + asyncbmt "github.com/ethersphere/swarm/file/bmt" + "golang.org/x/crypto/sha3" +) + +var ( + branches = 128 +) + +// NewBMTHasherFunc is a test helper that creates a new asynchronous hasher with a specified poolsize +func NewBMTHasherFunc(poolSize int) file.SectionWriterFunc { + if poolSize == 0 { + poolSize = bmt.PoolSize + } + poolAsync := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, poolSize) + refHashFunc := func(ctx context.Context) file.SectionWriter { + bmtHasher := bmt.New(poolAsync) + return asyncbmt.NewAsyncHasher(ctx, bmtHasher, false, nil) + } + return refHashFunc +} diff --git a/file/types.go b/file/types.go index 5ad84fce90..0d711bdbdd 100644 --- a/file/types.go +++ b/file/types.go @@ -5,12 +5,18 @@ import ( "hash" ) +// SectionWriterFunc defines the required function signature to be used to create a SectionWriter type SectionWriterFunc func(ctx context.Context) SectionWriter +// SectionWriter is a chainable interface for processing of chunk data +// +// Implementations should pass data to underlying writer before performing their own sum calculations type SectionWriter interface { - hash.Hash // Write,Sum,Reset,Size,BlockSize - SetWriter(hashFunc SectionWriterFunc) SectionWriter // chain another SectionWriter the current instance - SetSpan(length int) // set data span of chunk - SectionSize() int // section size of this SectionWriter - Branches() int // branch factor of this SectionWriter + hash.Hash // Write,Sum,Reset,Size,BlockSize + SetWriter(hashFunc SectionWriterFunc) SectionWriter // chain another SectionWriter the current instance + SetSpan(length int) // set data span of chunk + SectionSize() int // section size of this SectionWriter + Branches() int // branch factor of this SectionWriter + SumIndexed(prepended_data []byte, span_length int) []byte // Blocking call returning the sum of the data from underlying writer, setting the final data length to span_length + WriteIndexed(int, []byte) // Write to a particular data section, enabling asynchronous writing to any position of any level } diff --git a/go.mod b/go.mod index a64125cf9a..ea828c11a3 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c // indirect github.com/naoina/go-stringutil v0.1.0 // indirect github.com/naoina/toml v0.0.0-20170918210437-9fafd6967416 + github.com/nolash/mockbytes v0.0.0-20200401133213-4cadbd154ff6 // indirect github.com/opencontainers/go-digest v1.0.0-rc1 // indirect github.com/opencontainers/image-spec v1.0.1 // indirect github.com/opencontainers/runc v0.1.1 // indirect @@ -51,6 +52,7 @@ require ( github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6 github.com/uber/jaeger-lib v0.0.0-20180615202729-a51202d6f4a7 // indirect github.com/vbauerster/mpb v3.4.0+incompatible + gitlab.com/nolash/go-mockbytes v0.0.2 // indirect go.uber.org/atomic v1.4.0 // indirect golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 diff --git a/go.sum b/go.sum index 3f65912b67..8ea748f65f 100644 --- a/go.sum +++ b/go.sum @@ -66,6 +66,7 @@ github.com/elastic/gosigar v0.0.0-20180330100440-37f05ff46ffa/go.mod h1:cdorVVzy github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/ethereum/go-ethereum v1.9.2 h1:RMIHDO/diqXEgORSVzYx8xW9x2+S32PoAX5lQwya0Lw= github.com/ethereum/go-ethereum v1.9.2/go.mod h1:PwpWDrCLZrV+tfrhqqF6kPknbISMHaJv9Ln3kPCZLwY= +github.com/ethersphere/bmt v0.0.0-20200401123433-67173b18a663 h1:MXvgOFHffdlMLiwKrfiVIda9/OCBZhjus02w9z3rIds= github.com/ethersphere/go-sw3 v0.2.1 h1:+i660uWzhRbT1YO7MAeuxzn+jUeYOTc8rGRVjsKaw+4= github.com/ethersphere/go-sw3 v0.2.1/go.mod h1:HukT0aZ6QdW/d7zuD/0g5xlw6ewu9QeqHojxLDsaERQ= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -186,6 +187,8 @@ github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hz github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/naoina/toml v0.0.0-20170918210437-9fafd6967416 h1:9M852Z3gvzUmyFvy+TruhDWCwcIG3cZWg/+Eh8VkR7M= github.com/naoina/toml v0.0.0-20170918210437-9fafd6967416/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= +github.com/nolash/mockbytes v0.0.0-20200401133213-4cadbd154ff6 h1:3VxaUbphSn8+igZjC75xdy4okZdRbE9u8QlfgrzIty4= +github.com/nolash/mockbytes v0.0.0-20200401133213-4cadbd154ff6/go.mod h1:yRWDp8wo0IXx2FReB9wTh/0E8oezBIDFdhYE1qUQMM0= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20190409134802-7e037d187b0c h1:2j4kdCOg5xiOVCTQpv0SgbzndaVJKliD6oRbMxTw6v4= github.com/olekukonko/tablewriter v0.0.0-20190409134802-7e037d187b0c/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= @@ -273,6 +276,8 @@ github.com/vbauerster/mpb v3.4.0+incompatible h1:mfiiYw87ARaeRW6x5gWwYRUawxaW1tL github.com/vbauerster/mpb v3.4.0+incompatible/go.mod h1:zAHG26FUhVKETRu+MWqYXcI70POlC6N8up9p1dID7SU= github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 h1:1cngl9mPEoITZG8s8cVcUy5CeIBYhEESkOB7m6Gmkrk= github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208/go.mod h1:IotVbo4F+mw0EzQ08zFqg7pK3FebNXpaMsRy2RT+Ees= +gitlab.com/nolash/go-mockbytes v0.0.2 h1:qGF5eH+eBxCfNdw+6v5U1Z4ddkPOdVx01Aczp10jCEY= +gitlab.com/nolash/go-mockbytes v0.0.2/go.mod h1:3iOEPU9bRvF/FBBmiBdWCxvuPmKqQ6cPoX8Ro2yoILc= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=