-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfilebuffer.go
159 lines (139 loc) · 5.05 KB
/
filebuffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package dit
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"path/filepath"
)
// FileBuffer embeds a buffering IO object from the bufio package. It implements
// an io.ReadWriteCloser object and has a temporary buffer for the most recent
// io operation it has witnessed.
type FileBuffer struct {
// r/w is a buffered reader/writer. the underlying type of the buffered
// object is determined by whether this is used for reading/writing to
// the underlying data source
r *bufio.Reader
w *bufio.Writer
// the file so we can close it when we are done
f io.ReadWriteCloser
// buf keeps the most recents data read/written from/to the underlying data
// source for retransmission
buf *bytes.Buffer
}
// NewFileBufferFunc returns the request and a closure to open/create file and
// embed it in a buffered io object for efficient reading/writing operations
func NewFileBuffer() *FileBuffer {
return &FileBuffer{buf: new(bytes.Buffer)}
}
func (f *FileBuffer) WithRequest(op Opcode, file io.ReadWriteCloser) {
f.f = file
switch op {
case Rrq:
f.r = bufio.NewReader(file)
case Wrq:
f.w = bufio.NewWriter(file)
}
}
// checks if the file the buffer was created from is the same as other file
func (f *FileBuffer) Is(name string) bool {
fi, ok := f.f.(*os.File)
if !ok {
return false
}
return filepath.Base(fi.Name()) == filepath.Base(name)
}
func (f *FileBuffer) Reset() {
f.buf.Reset()
}
// Read tries to read exactly len(b) from the underlying buffered io object into
// b. If returns the the number of bytes copied and an error if fewer
// than len(b) bytes were read. It returns an io.EOF if no bytes are read and
// an io.ErrUnexpectedEOF if an io.EOF is ecountered while reading from source
func (f *FileBuffer) Read(b []byte) (int, error) {
return io.ReadFull(f.r, b)
}
// Write tries to write len(p) bytes to the underlying data stream. the
// behaviour of this funtion is defined by io.Writer and bufio.Writer.Write
func (f *FileBuffer) Write(b []byte) (int, error) {
return f.w.Write(b)
}
// ReadNext tries to return the next set of len(b) bytes from the
// underlying data source. Keeping the bytes read in a temporary buffer
// incase there is the need to retransmit it.
func (f *FileBuffer) ReadNext(b []byte) (int, error) {
// read next bunch of bytes from underlying storage
read, err := f.Read(b)
// reset the temporary buffer and copy bytes from underlying data
// source into it. writing only the bytes read from storage
if read > 0 {
f.buf.Reset()
if n, err := f.buf.Write(b[:read]); err != nil {
return read, fmt.Errorf("dit: err writting to tmp buffer: %w", err)
} else if read != n {
return read, fmt.Errorf("dit: tmp buffer write exp %d bytes, wrote %d bytes", read, n)
}
}
// at this stage we have either;
// 1. read exactly len(b) bytes and have written it to tmp buffer
// 2. read less than len(b) bytes and have written it to tmp buffer
// 3. read nothing and written nothing to tmp buffer
return read, err
}
// WriteNext tries to write the next set of len(p) bytes to the underlying data
// stream, keeping the same amount of bytes written in a temporary buffer.
// It returns the number of bytes written from p if the write stopped early,
// if the write to the temporary buffer results in an error or if the number
// of bytes written to temporary buffer is less than the number written to
// underlying data source
func (f *FileBuffer) WriteNext(b []byte) (int, error) {
// try to write len(b) bytes to the underlying storage
wrote, err := f.Write(b)
// if we wrote something, we have to keep what was written in the
// underlying storage for keeps
if wrote > 0 {
if n, err := f.buf.Write(b[:wrote]); err != nil {
return wrote, fmt.Errorf("dit: tmp buffer write: %w", err)
} else if n != wrote {
return wrote, fmt.Errorf("dit: tmp buffer write: expected %d, got %d", wrote, n)
}
}
// at this stage we have either;
// 1. successfully written everything to the underlying storage and tmp
// buffer
// 2. written something to underlying storage and tmp buffer
// 3. written nothing to underlying storage and tmp buffer
// either we stop and return the errors and bytes written
return wrote, err
}
// BufferLen returns the length of the temporary buffer storing the most
// recent data from/to the underlying data stream
func (f *FileBuffer) BufferLen() int {
return f.buf.Len()
}
// ReadBuffer tries to copy len(b) bytes from the temporary buffer into b and
// returns the number of bytes copied
//
// if you want exactly all the amount of data in the buffer then you have
// to supply a buffer with length >= f.BufferLen()
func (f *FileBuffer) ReadBuffer(b []byte) int {
return copy(b, f.buf.Bytes())
}
// BufferedObject returns the underlying reader or writer depending on the
// request. It returns a reader when request is a read request and a writer
// if request if a write request
func (f *FileBuffer) BufferedObject() any {
if f.r != nil {
return f.r
}
return f.w
}
// Close resources associated with buffered io operations
func (f *FileBuffer) Close() error {
if f.w != nil {
return f.w.Flush()
}
f.buf.Reset()
return nil
}