-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathprogress.go
More file actions
248 lines (217 loc) · 6.49 KB
/
progress.go
File metadata and controls
248 lines (217 loc) · 6.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
/*
Package progressio contains io.Reader and io.Writer wrappers to easily get
progress feedback, including speed/sec, average speed, %, time remaining,
size, transferred size, ... over a channel in a progressio.Progress object.
Important note is that the returned object implements the io.Closer interface
and you have to close the progressio.ProgressReader and
progressio.ProgressWriter objects in order to clean everything up.
Usage is pretty simple:
preader, pchan := progressio.NewProgressReader(myreader, -1)
defer preader.Close()
go func() {
for p := range pchan {
fmt.Printf("Progress: %s\n", p.String())
}
}
// read from your new reader object
io.Copy(mywriter, preader)
A helper function is available that opens a file, determines it's size, and
wraps it's os.File io.Reader object:
if pr, pc, err := progressio.NewProgressFileReader(myfile); err != nil {
return err
} else {
defer pr.Close()
go func() {
for p := range pc{
fmt.Printf("Progress: %s\n", p.String())
}
}
// read from your new reader object
io.Copy(mywriter, pr)
}
A wrapper for an io.WriterCloser is available too, but no helper function
is available to write to an os.File since the target size is not known.
Usually, wrapping the io.Writer is more accurate, since writing potentially
takes up more time and happens last. Useage is similar to wrapping the
io.Reader:
pwriter, pchan := progressio.NewProgressWriter(mywriter, -1)
defer pwriter.Close()
go func() {
for p := range pchan {
fmt.Printf("Progress: %s\n", p.String())
}
}
// write to your new writer object
io.Copy(pwriter, myreader)
Note that you can also implement your own formatting. See the String() function
implementation or consult the Progress struct layout and documentation
*/
package progressio
import (
"fmt"
"time"
)
// Frequency of the updates over the channels
const UpdateFreq = 100 * time.Millisecond
const timeSlots = 5
// Progress is the object sent back over the progress channel.
type Progress struct {
Transferred int64 // Transferred data in bytes
TotalSize int64 // Total size of the transfer in bytes. <= 0 if size is unknown.
Percent float64 // If the size is known, the progress of the transfer in %
SpeedAvg int64 // Bytes/sec average over the entire transfer
Speed int64 // Bytes/sec of the last few reads/writes
Remaining time.Duration // Estimated time remaining, only available if the size is known.
StartTime time.Time // When the transfer was started
StopTime time.Time // only specified when the transfer is completed: when the transfer was stopped
}
type ioProgress struct {
size int64
progress int64
ch chan Progress
closed bool
startTime time.Time
lastSent time.Time
updatesW []int64
updatesT []time.Time
ts int
}
// String returns a string representation of the progress. It takes into account
// if the size was known, and only tries to display relevant data.
func (p *Progress) String() string {
timeS := fmt.Sprintf(" (Time: %s", FormatDuration(time.Since(p.StartTime)))
// Build the Speed string
speedS := ""
if p.Speed > 0 {
speedS = fmt.Sprintf(" (Speed: %s", FormatSize(IEC, p.Speed, true)) + "/s"
}
if p.SpeedAvg > 0 {
if len(speedS) > 0 {
speedS += " / AVG: "
} else {
speedS = " (Speed AVG: "
}
speedS += FormatSize(IEC, p.SpeedAvg, true) + "/s"
}
if len(speedS) > 0 {
speedS += ")"
}
if p.TotalSize <= 0 {
// No size was given, we can only show:
// - Amount read/written
// - average speed
// - current speed
return fmt.Sprintf("%s%s%s)",
FormatSize(IEC, p.Transferred, true),
speedS,
timeS,
)
}
// A size was given, we can add:
// - Percentage
// - Progress indicator
// - Remaining time
timeR := ""
if p.Remaining >= time.Duration(0) {
timeR = fmt.Sprintf(" / Remaining: %s", FormatDuration(p.Remaining))
}
return fmt.Sprintf("[%02.2f%%] (%s/%s)%s%s%s)",
p.Percent,
FormatSize(IEC, p.Transferred, true),
FormatSize(IEC, p.TotalSize, true),
speedS,
timeS,
timeR,
)
}
func mkIoProgress(size int64) *ioProgress {
return &ioProgress{
size: size,
progress: 0,
ch: make(chan Progress),
closed: false,
startTime: time.Time{},
lastSent: time.Time{},
updatesW: make([]int64, timeSlots),
updatesT: make([]time.Time, timeSlots),
ts: 0,
}
}
func (p *ioProgress) updateProgress(written int64) {
if p.closed && p.ch == nil {
// Nothing to do
return
}
if written > 0 {
p.progress += written
}
// Throttle sending updated, limit to UpdateFreq - which should be 100ms
// Always send when finished
if (time.Since(p.lastSent) < UpdateFreq) && ((p.size > 0) && (p.progress != p.size)) {
return
}
if p.startTime.IsZero() {
p.startTime = time.Now()
}
prog := Progress{
StartTime: p.startTime,
Transferred: p.progress,
TotalSize: p.size,
}
// Calculate current speed based on the last `timeSlots` updates sent
p.updatesW[p.ts%timeSlots] = p.progress
p.updatesT[p.ts%timeSlots] = time.Now()
p.ts++
if !p.updatesT[p.ts%timeSlots].IsZero() {
// Calculate the average speed of the last ~2 seconds
prog.Speed = int64((float64(p.progress-p.updatesW[p.ts%timeSlots]) / float64(time.Since(p.updatesT[p.ts%timeSlots]))) * float64(time.Second))
// Calculate the average speed since starting the transfer
tp := time.Since(p.startTime)
if tp > 0 {
prog.SpeedAvg = int64((float64(p.progress) / float64(tp)) * float64(time.Second))
} else {
prog.SpeedAvg = -1
}
if p.size > 0 && prog.SpeedAvg > 0 {
prog.Remaining = time.Duration((float64(p.size-p.progress) / float64(prog.SpeedAvg)) * float64(time.Second))
} else {
prog.Remaining = -1
}
} else {
prog.Speed = -1
prog.SpeedAvg = -1
prog.Remaining = -1
}
// Calculate the percentage only if we have a size
if p.size > 0 {
prog.Percent = float64(int64((float64(p.progress)/float64(p.size))*10000.0)) / 100.0
}
if p.closed || p.progress == p.size {
// EOF or closed, we have to send this last message, and then close the chan
// Prevent sending the last message multiple times
if p.ch != nil {
prog.StopTime = time.Now()
p.ch <- prog
p.cleanup()
}
} else {
// Don't force send, only send when it would not block, the chan is non-buffered
select {
case p.ch <- prog:
// update last sent values
p.lastSent = time.Now()
default:
}
}
}
func (p *ioProgress) cleanup() {
p.closed = true
if p.ch != nil {
close(p.ch)
p.ch = nil
}
}
func (p *ioProgress) stopProgress() {
p.closed = true
p.updateProgress(-1)
}