注意:这个问题与其他并发问题不同,因为不仅写入者是单一的,而且写入操作严格只发生一次。
当一个人有多个并发读取器和一个只写入一次的写入器(例如并发环境中的 setter)时,go 中使用什么同步方法?
sync.mutex
适用于这种情况,但是,由于只有一个写入器,因此 sync.rwmutex
甚至更好,因为它比常规互斥体稍快。
不过,在应用程序运行期间进行互斥锁锁定以简单地设置一次值感觉很浪费。
有没有更快的方法?
package main import ( "sync" ) type RWMutexErrorNotifier struct { rwMutex sync.RWMutex emailSender func(string) } func (a *RWMutexErrorNotifier) SetEmailSenderService(emailSender func(string)) { a.rwMutex.Lock() defer a.rwMutex.Unlock() a.emailSender = emailSender } func (a *RWMutexErrorNotifier) SendErrorMessage(errorMessage string) { a.rwMutex.RLock() defer a.rwMutex.RUnlock() if a.emailSender == nil { return } a.emailSender(errorMessage) } func main() { sender := &RWMutexErrorNotifier{} errorsCount := 100_000 emailSender := func(emailMessage string) { // sending email... } var wg sync.WaitGroup // used only for demo purposes wg.Add(errorsCount) for i := 0; i < errorsCount; i++ { go func() { sender.SendErrorMessage("ALARM!") wg.Done() }() } sender.SetEmailSenderService(emailSender) // making a single write wg.Wait() }
感谢 @peter-cordes 的更新 - 请参阅答案下方的评论
显然,对于单个写入者的单次写入,读取和设置 atomic.bool
比 sync.rwmutex
更快。它仅比无锁竞赛解决方案(仅用于基准测试)稍慢
基准测试结果(基于 sync.mutex
的解决方案也仅用于基准测试目的):
$ go test -run=xxx -bench=. -benchmem -benchtime=1000000x goos: darwin goarch: arm64 pkg: untitled1 benchmarkracynogoodconcurrently-12 1000000 3529 ns/op 384 b/op 12 allocs/op benchmarkatomicbooleanconcurrently-12 1000000 3494 ns/op 384 b/op 12 allocs/op benchmarkrwmutexconcurrently-12 1000000 3909 ns/op 384 b/op 12 allocs/op benchmarkmutexconcurrently-12 1000000 4180 ns/op 384 b/op 12 allocs/op benchmarkracynogoodsequentiallyallcores-12 1000000 3.661 ns/op 0 b/op 0 allocs/op benchmarkatomicbooleansequentiallyallcores-12 1000000 3.748 ns/op 0 b/op 0 allocs/op benchmarkrwmutexsequentiallyallcores-12 1000000 1934 ns/op 0 b/op 0 allocs/op benchmarkmutexsequentiallyallcores-12 1000000 1486 ns/op 0 b/op 0 allocs/op benchmarkracynogoodsequentiallysinglecore-12 1000000 28.95 ns/op 0 b/op 0 allocs/op benchmarkatomicbooleansequentiallysinglecore-12 1000000 29.54 ns/op 0 b/op 0 allocs/op benchmarkrwmutexsequentiallysinglecore-12 1000000 188.6 ns/op 0 b/op 0 allocs/op benchmarkmutexsequentiallysinglecore-12 1000000 187.4 ns/op 0 b/op 0 allocs/op pass ok untitled1 19.093s
package main import ( "runtime" "sync" "sync/atomic" ) type errornotifier interface { setemailsenderservice(func(string)) senderrormessage(string) } // mutex type mutexerrornotifier struct { mutex sync.mutex emailsender func(string) } var _ errornotifier = (*mutexerrornotifier)(nil) func (a *mutexerrornotifier) setemailsenderservice(emailsender func(string)) { a.mutex.lock() defer a.mutex.unlock() a.emailsender = emailsender } func (a *mutexerrornotifier) senderrormessage(errormessage string) { a.mutex.lock() defer a.mutex.unlock() if a.emailsender != nil { a.emailsender(errormessage) } } // rwmutex type rwmutexerrornotifier struct { rwmutex sync.rwmutex emailsender func(string) } var _ errornotifier = (*rwmutexerrornotifier)(nil) func (a *rwmutexerrornotifier) setemailsenderservice(emailsender func(string)) { a.rwmutex.lock() defer a.rwmutex.unlock() a.emailsender = emailsender } func (a *rwmutexerrornotifier) senderrormessage(errormessage string) { a.rwmutex.rlock() defer a.rwmutex.runlock() if a.emailsender != nil { a.emailsender(errormessage) } } // atomic boolean type atomicbooleanerrornotifier struct { emailerisset atomic.bool emailsender func(string) } var _ errornotifier = (*atomicbooleanerrornotifier)(nil) func (a *atomicbooleanerrornotifier) setemailsenderservice(emailsender func(string)) { defer a.emailerisset.store(true) a.emailsender = emailsender } func (a *atomicbooleanerrornotifier) senderrormessage(errormessage string) { if a.emailerisset.load() { a.emailsender(errormessage) } } // not a solution: racy no locking solution - just for benchmarking type racynogooderrornotifier struct { emailsender func(string) } var _ errornotifier = (*racynogooderrornotifier)(nil) func (a *racynogooderrornotifier) setemailsenderservice(emailsender func(string)) { a.emailsender = emailsender } func (a *racynogooderrornotifier) senderrormessage(errormessage string) { if a.emailsender != nil { a.emailsender(errormessage) } } // demo run const allconcurrent = "all concurrent" const sequentialsinglecore = "sequential single core" const sequentialallcores = "sequential all cores" func run(n int, runner errornotifier, runtype string) { emailsender := func(emailmessage string) { // sending email... } var wg sync.waitgroup switch runtype { case allconcurrent: wg.add(n * runtime.numcpu()) for i := 0; i < n*runtime.numcpu(); i++ { go func() { runner.senderrormessage("alarm!") wg.done() }() } case sequentialallcores: wg.add(runtime.numcpu()) for i := 0; i < runtime.numcpu(); i++ { go func() { for j := 0; j < n; j++ { runner.senderrormessage("alarm!") } wg.done() }() } case sequentialsinglecore: wg.add(1) go func() { for j := 0; j < n*runtime.numcpu(); j++ { runner.senderrormessage("alarm!") } wg.done() }() default: panic("unknown mode") } runner.setemailsenderservice(emailsender) wg.wait() }
基准:
package main import "testing" func BenchmarkRacyNoGoodConcurrently(b *testing.B) { Run(b.N, &RacyNoGoodErrorNotifier{}, allConcurrent) } func BenchmarkAtomicBooleanConcurrently(b *testing.B) { Run(b.N, &AtomicBooleanErrorNotifier{}, allConcurrent) } func BenchmarkRWMutexConcurrently(b *testing.B) { Run(b.N, &RWMutexErrorNotifier{}, allConcurrent) } func BenchmarkMutexConcurrently(b *testing.B) { Run(b.N, &MutexErrorNotifier{}, allConcurrent) } func BenchmarkRacyNoGoodSequentiallyAllCores(b *testing.B) { Run(b.N, &RacyNoGoodErrorNotifier{}, sequentialAllCores) } func BenchmarkAtomicBooleanSequentiallyAllCores(b *testing.B) { Run(b.N, &AtomicBooleanErrorNotifier{}, sequentialAllCores) } func BenchmarkRWMutexSequentiallyAllCores(b *testing.B) { Run(b.N, &RWMutexErrorNotifier{}, sequentialAllCores) } func BenchmarkMutexSequentiallyAllCores(b *testing.B) { Run(b.N, &MutexErrorNotifier{}, sequentialAllCores) } func BenchmarkRacyNoGoodSequentiallySingleCore(b *testing.B) { Run(b.N, &RacyNoGoodErrorNotifier{}, sequentialSingleCore) } func BenchmarkAtomicBooleanSequentiallySingleCore(b *testing.B) { Run(b.N, &AtomicBooleanErrorNotifier{}, sequentialSingleCore) } func BenchmarkRWMutexSequentiallySingleCore(b *testing.B) { Run(b.N, &RWMutexErrorNotifier{}, sequentialSingleCore) } func BenchmarkMutexSequentiallySingleCore(b *testing.B) { Run(b.N, &MutexErrorNotifier{}, sequentialSingleCore) }