lib/connections: Use adaptive write size for rate limited connections (fixes #8630) (#8631)

This commit is contained in:
Jakob Borg 2022-11-03 15:44:46 +01:00 committed by GitHub
parent d8296ce111
commit 413c8cf4ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 11 deletions

View File

@ -38,7 +38,6 @@ type waiter interface {
const ( const (
limiterBurstSize = 4 * 128 << 10 limiterBurstSize = 4 * 128 << 10
maxSingleWriteSize = 8 << 10
) )
func newLimiter(myId protocol.DeviceID, cfg config.Wrapper) *limiter { func newLimiter(myId protocol.DeviceID, cfg config.Wrapper) *limiter {
@ -251,10 +250,20 @@ func (w *limitedWriter) Write(buf []byte) (int, error) {
} }
// This does (potentially) multiple smaller writes in order to be less // This does (potentially) multiple smaller writes in order to be less
// bursty with large writes and slow rates. // bursty with large writes and slow rates. At the same time we don't
// want to do hilarious amounts of tiny writes when the rate is high, so
// try to be a bit adaptable. We range from the minimum write size of 1
// KiB up to the limiter burst size, aiming for about a write every
// 10ms.
singleWriteSize := int(w.waiter.Limit() / 100) // 10ms worth of data
singleWriteSize = ((singleWriteSize / 1024) + 1) * 1024 // round up to the next kibibyte
if singleWriteSize > limiterBurstSize {
singleWriteSize = limiterBurstSize
}
written := 0 written := 0
for written < len(buf) { for written < len(buf) {
toWrite := maxSingleWriteSize toWrite := singleWriteSize
if toWrite > len(buf)-written { if toWrite > len(buf)-written {
toWrite = len(buf) - written toWrite = len(buf) - written
} }
@ -294,7 +303,7 @@ func (w waiterHolder) take(tokens int) {
// into the lower level reads so we might get a large amount of data and // into the lower level reads so we might get a large amount of data and
// end up in the loop further down. // end up in the loop further down.
if tokens < limiterBurstSize { if tokens <= limiterBurstSize {
// Fast path. We won't get an error from WaitN as we don't pass a // Fast path. We won't get an error from WaitN as we don't pass a
// context with a deadline. // context with a deadline.
_ = w.waiter.WaitN(context.TODO(), tokens) _ = w.waiter.WaitN(context.TODO(), tokens)

View File

@ -218,7 +218,7 @@ func TestLimitedWriterWrite(t *testing.T) {
// A buffer with random data that is larger than the write size and not // A buffer with random data that is larger than the write size and not
// a precise multiple either. // a precise multiple either.
src := make([]byte, int(12.5*maxSingleWriteSize)) src := make([]byte, int(12.5*8192))
if _, err := crand.Reader.Read(src); err != nil { if _, err := crand.Reader.Read(src); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -242,9 +242,14 @@ func TestLimitedWriterWrite(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// Verify there were lots of writes and that the end result is identical. // Verify there were lots of writes (we expect one kilobyte write size
if cw.writeCount != 13 { // for the very low rate in this test) and that the end result is
t.Error("expected lots of smaller writes, but not too many") // identical.
if cw.writeCount < 10*8 {
t.Error("expected lots of smaller writes")
}
if cw.writeCount > 15*8 {
t.Error("expected fewer larger writes")
} }
if !bytes.Equal(src, dst.Bytes()) { if !bytes.Equal(src, dst.Bytes()) {
t.Error("results should be equal") t.Error("results should be equal")
@ -319,8 +324,11 @@ func TestLimitedWriterWrite(t *testing.T) {
} }
// Verify there were lots of writes and that the end result is identical. // Verify there were lots of writes and that the end result is identical.
if cw.writeCount != 13 { if cw.writeCount < 10*8 {
t.Error("expected just the one write") t.Error("expected lots of smaller writes")
}
if cw.writeCount > 15*8 {
t.Error("expected fewer larger writes")
} }
if !bytes.Equal(src, dst.Bytes()) { if !bytes.Equal(src, dst.Bytes()) {
t.Error("results should be equal") t.Error("results should be equal")