mirror of
https://github.com/syncthing/syncthing.git
synced 2024-11-15 18:08:45 -07:00
5342bec1b7
This is a symmetric change to #9375 -- where that PR changed the protocol->model interface, this changes the model->protocol one.
100 lines
2.7 KiB
Go
100 lines
2.7 KiB
Go
// Copyright (C) 2024 The Syncthing Authors.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
package model_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/syncthing/syncthing/lib/db"
|
|
"github.com/syncthing/syncthing/lib/model/mocks"
|
|
"github.com/syncthing/syncthing/lib/protocol"
|
|
protomock "github.com/syncthing/syncthing/lib/protocol/mocks"
|
|
"github.com/syncthing/syncthing/lib/testutil"
|
|
)
|
|
|
|
func TestIndexhandlerConcurrency(t *testing.T) {
|
|
// Verify that sending a lot of index update messages using the
|
|
// FileInfoBatch works and doesn't trigger the race detector.
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
ar, aw := io.Pipe()
|
|
br, bw := io.Pipe()
|
|
ci := &protomock.ConnectionInfo{}
|
|
|
|
m1 := &mocks.Model{}
|
|
c1 := protocol.NewConnection(protocol.EmptyDeviceID, ar, bw, testutil.NoopCloser{}, m1, ci, protocol.CompressionNever, nil, nil)
|
|
c1.Start()
|
|
defer c1.Close(io.EOF)
|
|
|
|
m2 := &mocks.Model{}
|
|
c2 := protocol.NewConnection(protocol.EmptyDeviceID, br, aw, testutil.NoopCloser{}, m2, ci, protocol.CompressionNever, nil, nil)
|
|
c2.Start()
|
|
defer c2.Close(io.EOF)
|
|
|
|
c1.ClusterConfig(&protocol.ClusterConfig{})
|
|
c2.ClusterConfig(&protocol.ClusterConfig{})
|
|
c1.Index(ctx, &protocol.Index{Folder: "foo"})
|
|
c2.Index(ctx, &protocol.Index{Folder: "foo"})
|
|
|
|
const msgs = 5e2
|
|
const files = 1e3
|
|
|
|
recvdEntries := 0
|
|
recvdBatches := 0
|
|
var wg sync.WaitGroup
|
|
m2.IndexUpdateCalls(func(_ protocol.Connection, idxUp *protocol.IndexUpdate) error {
|
|
for j := 0; j < files; j++ {
|
|
if n := idxUp.Files[j].Name; n != fmt.Sprintf("f%d-%d", recvdBatches, j) {
|
|
t.Error("wrong filename", n)
|
|
}
|
|
recvdEntries++
|
|
}
|
|
recvdBatches++
|
|
wg.Done()
|
|
return nil
|
|
})
|
|
|
|
b1 := db.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
|
|
return c1.IndexUpdate(ctx, &protocol.IndexUpdate{Folder: "foo", Files: fs})
|
|
})
|
|
sentEntries := 0
|
|
for i := 0; i < msgs; i++ {
|
|
for j := 0; j < files; j++ {
|
|
b1.Append(protocol.FileInfo{
|
|
Name: fmt.Sprintf("f%d-%d", i, j),
|
|
Blocks: []protocol.BlockInfo{{Hash: make([]byte, 32)}},
|
|
})
|
|
sentEntries++
|
|
}
|
|
wg.Add(1)
|
|
if err := b1.Flush(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// Every sent IndexUpdate should be matched by a corresponding index
|
|
// message on the other side. Use the waitgroup to wait for this to
|
|
// complete, as otherwise the Close below can race with the last
|
|
// outgoing index message and the count between sent and received is
|
|
// wrong.
|
|
wg.Wait()
|
|
|
|
c1.Close(io.EOF)
|
|
c2.Close(io.EOF)
|
|
<-c1.Closed()
|
|
<-c2.Closed()
|
|
|
|
if recvdEntries != sentEntries {
|
|
t.Error("didn't receive all expected messages", recvdEntries, sentEntries)
|
|
}
|
|
}
|