sync

package
v0.0.0-...-0718798 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2025 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package sync provides synchronization primitives not found in the standard library sync package. Some types are aliases of sync types to allow this library to be used as a drop-in with extras.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func OnceFunc

func OnceFunc(f func()) func()

OnceFunc returns a function that invokes f only once. The returned function may be called concurrently.

If f panics, the returned function will panic with the same value on every call.

This is an alias of https://pkg.go.dev/sync#OnceFunc.

func OnceValue

func OnceValue[T any](f func() T) func() T

OnceValue returns a function that invokes f only once and returns the value returned by f. The returned function may be called concurrently.

If f panics, the returned function will panic with the same value on every call.

This is an alias of https://pkg.go.dev/sync#OnceValue.

func OnceValues

func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceValues returns a function that invokes f only once and returns the values returned by f. The returned function may be called concurrently.

If f panics, the returned function will panic with the same value on every call.

This is an alias of https://pkg.go.dev/sync#OnceValues.

Types

type Chan

type Chan[T any] struct {
	// contains filtered or unexported fields
}

A Chan is a channel of type T, synchronized with lock-free atomic operations.

The initial value is nil, requiring a call to a method that initializes a channel, such as Chan.New or Chan.Reset. If lazy initialization is needed, LazyChan can be used.

Since Chan uses atomic.Pointer to store the underlying channel, it inherits the same restrictions disallowing copying and spurious type conversion.

Example
package main

import (
	"fmt"

	"go.chrisrx.dev/x/sync"
)

func main() {
	var ch sync.Chan[string]
	c := ch.New(0)

	go func() {
		defer ch.Close()
		for range 5 {
			c <- "done"
		}
	}()

	for v := range c {
		fmt.Println(v)
	}
}
Output:

done
done
done
done
done
Example (Buffered)
package main

import (
	"fmt"

	"go.chrisrx.dev/x/sync"
)

func main() {
	var ch sync.Chan[int]
	ch.New(1) <- 10  // initialize and write to buffered channel
	v := <-ch.Load() // atomically load new channel and read
	fmt.Println(v)
}
Output:

10
Example (NewChan)
package main

import (
	"fmt"

	"go.chrisrx.dev/x/sync"
)

func main() {
	ch := sync.NewChan[int](0).Load()

	go func() {
		defer close(ch)
		ch <- 10
		ch <- 20
		ch <- 30
	}()

	for v := range ch {
		fmt.Println(v)
	}
}
Output:

10
20
30
Example (Reset)
package main

import (
	"fmt"

	"go.chrisrx.dev/x/sync"
)

func main() {
	ch := sync.NewChan[int](10)
	ch.Load() <- 10
	ch.Reset() <- 20 // reset without reading, and send new value
	v := <-ch.Recv()
	fmt.Println(v)
}
Output:

20
Example (Send)
package main

import (
	"fmt"
	"time"

	"go.chrisrx.dev/x/sync"
)

func main() {
	ch := sync.NewChan[int](0)

	sent := ch.TrySend(10) // will timeout
	time.Sleep(200 * time.Millisecond)
	fmt.Println(sent)

	go func() {
		defer ch.Close()

		ch.Send(20, 30)
	}()

	for v := range ch.Recv() {
		fmt.Println(v)
	}
}
Output:

false
20
30

func NewChan

func NewChan[T any](capacity int) *Chan[T]

NewChan constructs a new Chan of type T. If capacity is greater than zero, it is initialized with a buffered channel, otherwise the channel is unbuffered.

func (*Chan[T]) Cap

func (ch *Chan[T]) Cap() int

Cap returns the current channel capacity.

func (*Chan[T]) Close

func (ch *Chan[T]) Close()

Close closes the current channel, if present. The stored value is replaced with a nil.

func (*Chan[T]) CloseAndRecv

func (ch *Chan[T]) CloseAndRecv() <-chan T

CloseAndRecv closes the stored channel and returns a results channel with the remaining elements.

func (*Chan[T]) Closed

func (ch *Chan[T]) Closed() bool

Closed returns true when the current channel is closed.

func (*Chan[T]) Load

func (ch *Chan[T]) Load() chan T

Load loads the stored channel. If no channel is stored, a closed channel is returned.

func (*Chan[T]) LoadOrNew

func (ch *Chan[T]) LoadOrNew() (_ chan T, isNew bool)

LoadOrNew loads the stored channel, if present. If not present, a new channel will be created and the newly stored channel is returned.

func (*Chan[T]) New

func (ch *Chan[T]) New(capacity int) chan T

New constructs and stores a new channel. If a channel is already stored, it is closed after being replaced.

If capacity is set to zero, then the channel is unbuffered. If capacity is greater than zero, then a buffered channel with the given capacity will be created.

func (*Chan[T]) Recv

func (ch *Chan[T]) Recv() <-chan T

Recv returns the stored channel as receive-only.

func (*Chan[T]) Reset

func (ch *Chan[T]) Reset() chan T

Reset constructs and stores a new channel. It is the same as calling [New] with the current capacity value.

This is useful when needing to signal that channel consumers should close, while establishing a new channel for immediate use.

func (*Chan[T]) Send

func (ch *Chan[T]) Send(messages ...T)

Send sends the provided messages on the stored channel. The messages are sent sequentially in the order they are provided.

Send has the same behavior expected as directly using a Go channel. If the stored channel is unbuffered, calls to Send will block until a reader is receives the message. A buffered channel will send immedidately up to the available capacity.

If the stored channel is uninitialized or closed, it returns immediately.

func (*Chan[T]) TrySend

func (ch *Chan[T]) TrySend(messages ...T) (sent bool)

TrySend attempts to send a value on the stored channel. The messages are sent sequentially in the order they are provided.

Unlike Chan.Send, it will wait for the value to be sent for 100ms before returning. If the channel is closed while attempting to send a value, the send on closed panic is recovered and logged.

If the stored channel is uninitialized or closed, it returns immediately.

type Cond

type Cond = sync.Cond

Cond implements a condition variable, a rendezvous point for goroutines waiting for or announcing the occurrence of an event.

Each Cond has an associated Locker L (often a *Mutex or *RWMutex), which must be held when changing the condition and when calling the [Cond.Wait] method.

A Cond must not be copied after first use.

In the terminology of the Go memory model, Cond arranges that a call to [Cond.Broadcast] or [Cond.Signal] “synchronizes before” any Wait call that it unblocks.

For many simple use cases, users will be better off using channels than a Cond (Broadcast corresponds to closing a channel, and Signal corresponds to sending on a channel).

For more on replacements for sync.Cond, see Roberto Clapis's series on advanced concurrency patterns, as well as Bryan Mills's talk on concurrency patterns.

This is an alias of https://pkg.go.dev/sync#Cond.

func NewCond

func NewCond(l Locker) *Cond

NewCond returns a new Cond with Locker l.

This is an alias of https://pkg.go.dev/sync#NewCond.

type LazyChan

type LazyChan[T any] struct {
	Chan[T]
	// contains filtered or unexported fields
}

LazyChan wraps a Chan, initializing lazily upon initial access.

Example
package main

import (
	"fmt"

	"go.chrisrx.dev/x/sync"
)

func main() {
	var ch sync.LazyChan[string]

	go func() {
		defer ch.Close()
		for range 5 {
			ch.Load() <- "done"
		}
	}()

	for v := range ch.Load() {
		fmt.Println(v)
	}
}
Output:

done
done
done
done
done

func (*LazyChan[T]) Close

func (l *LazyChan[T]) Close()

Close closes the current channel, if present. The stored value is replaced with a nil.

If the underlying channel hasn't been initialized yet, then one will be created and stored before ultimately being closed. This behavior is necessary since the usage of LazyChan is predicated upon the underlying channel being initialized upon first usage, even if that usage is being closed.

func (*LazyChan[T]) Load

func (l *LazyChan[T]) Load() chan T

Load atomically loads the stored channel.

If no channel is currently stored, one will be created and stored.

func (*LazyChan[T]) Recv

func (l *LazyChan[T]) Recv() <-chan T

func (*LazyChan[T]) Send

func (l *LazyChan[T]) Send(messages ...T)

func (*LazyChan[T]) TrySend

func (l *LazyChan[T]) TrySend(messages ...T) (sent bool)

type Locker

type Locker = sync.Locker

A Locker represents an object that can be locked and unlocked.

This is an alias of https://pkg.go.dev/sync#Locker.

type Map

type Map = sync.Map

Map is like a Go map[any]any but is safe for concurrent use by multiple goroutines without additional locking or coordination. Loads, stores, and deletes run in amortized constant time.

The Map type is specialized. Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content.

The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.

The zero Map is empty and ready for use. A Map must not be copied after first use.

In the terminology of the Go memory model, Map arranges that a write operation “synchronizes before” any read operation that observes the effect of the write, where read and write operations are defined as follows. [Map.Load], [Map.LoadAndDelete], [Map.LoadOrStore], [Map.Swap], [Map.CompareAndSwap], and [Map.CompareAndDelete] are read operations; [Map.Delete], [Map.LoadAndDelete], [Map.Store], and [Map.Swap] are write operations; [Map.LoadOrStore] is a write operation when it returns loaded set to false; [Map.CompareAndSwap] is a write operation when it returns swapped set to true; and [Map.CompareAndDelete] is a write operation when it returns deleted set to true.

This is an alias of https://pkg.go.dev/sync#Map.

type Mutex

type Mutex = sync.Mutex

A Mutex is a mutual exclusion lock. The zero value for a Mutex is an unlocked mutex.

A Mutex must not be copied after first use.

In the terminology of the Go memory model, the n'th call to [Mutex.Unlock] “synchronizes before” the m'th call to [Mutex.Lock] for any n < m. A successful call to [Mutex.TryLock] is equivalent to a call to Lock. A failed call to TryLock does not establish any “synchronizes before” relation at all.

This is an alias of https://pkg.go.dev/sync#Mutex.

type Once

type Once struct {
	// contains filtered or unexported fields
}

Once is a resettable version of sync.Once.

func (*Once) Do

func (o *Once) Do(f func())

func (*Once) Reset

func (o *Once) Reset()

Reset resets the finished state, allowing reuse.

type Pool

type Pool = sync.Pool

A Pool is a set of temporary objects that may be individually saved and retrieved.

Any item stored in the Pool may be removed automatically at any time without notification. If the Pool holds the only reference when this happens, the item might be deallocated.

A Pool is safe for use by multiple goroutines simultaneously.

Pool's purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. That is, it makes it easy to build efficient, thread-safe free lists. However, it is not suitable for all free lists.

An appropriate use of a Pool is to manage a group of temporary items silently shared among and potentially reused by concurrent independent clients of a package. Pool provides a way to amortize allocation overhead across many clients.

An example of good use of a Pool is in the fmt package, which maintains a dynamically-sized store of temporary output buffers. The store scales under load (when many goroutines are actively printing) and shrinks when quiescent.

On the other hand, a free list maintained as part of a short-lived object is not a suitable use for a Pool, since the overhead does not amortize well in that scenario. It is more efficient to have such objects implement their own free list.

A Pool must not be copied after first use.

In the terminology of the Go memory model, a call to Put(x) “synchronizes before” a call to [Pool.Get] returning that same value x. Similarly, a call to New returning x “synchronizes before” a call to Get returning that same value x.

This is an alias of https://pkg.go.dev/sync#Pool.

type RWMutex

type RWMutex = sync.RWMutex

A RWMutex is a reader/writer mutual exclusion lock. The lock can be held by an arbitrary number of readers or a single writer. The zero value for a RWMutex is an unlocked mutex.

A RWMutex must not be copied after first use.

If any goroutine calls [RWMutex.Lock] while the lock is already held by one or more readers, concurrent calls to [RWMutex.RLock] will block until the writer has acquired (and released) the lock, to ensure that the lock eventually becomes available to the writer. Note that this prohibits recursive read-locking. A [RWMutex.RLock] cannot be upgraded into a [RWMutex.Lock], nor can a [RWMutex.Lock] be downgraded into a [RWMutex.RLock].

In the terminology of the Go memory model, the n'th call to [RWMutex.Unlock] “synchronizes before” the m'th call to Lock for any n < m, just as for Mutex. For any call to RLock, there exists an n such that the n'th call to Unlock “synchronizes before” that call to RLock, and the corresponding call to [RWMutex.RUnlock] “synchronizes before” the n+1'th call to Lock.

This is an alias of https://pkg.go.dev/sync#RWMutex.

type Semaphore

type Semaphore struct {
	// contains filtered or unexported fields
}

Semaphore is a weighted semaphore implementation built around Chan.

Example
package main

import (
	"fmt"
	"time"

	"go.chrisrx.dev/x/sync"
)

func main() {
	var sem sync.Semaphore
	sem.SetLimit(2)

	go func() {
		defer sem.Release()
		time.Sleep(100 * time.Millisecond)
	}()

	sem.Acquire(3)
	fmt.Println("done")
}
Output:

done

func NewSemaphore

func NewSemaphore(n int) *Semaphore

NewSemaphore constructs a new semaphore using the provided size.

func (*Semaphore) Acquire

func (s *Semaphore) Acquire(n int)

Acquire acquires a semaphore of weight n. If the size given was zero, this operation is a nop.

The weight provided cannot be greater than the semaphore capacity. If it is, the semaphore capacity is used instead.

func (*Semaphore) Release

func (s *Semaphore) Release()

Release releases a semaphore. If the size given was zero this operation is a nop.

func (*Semaphore) ReleaseN

func (s *Semaphore) ReleaseN(n int)

ReleaseN releases a semaphore with the provided weight. If the size given was zero this operation is a nop.

func (*Semaphore) SetLimit

func (s *Semaphore) SetLimit(n int)

SetLimit sets the limit for the semaphore.

func (*Semaphore) TryAcquire

func (s *Semaphore) TryAcquire(n int) bool

TryAcquire attempts to acquire a semaphore of weight n. It returns immediately if the semaphore cannot be acquired. If the size given was zero, this operation is a nop.

The weight provided cannot be greater than the semaphore capacity. If it is, the semaphore capacity is used instead.

type SeqChan

type SeqChan[T any] struct {
	Chan[T]
}

SeqChan wraps a Chan, providing Send/Recv methods that operate on iterator sequences.

Example
package main

import (
	"fmt"
	"slices"

	"go.chrisrx.dev/x/sync"
)

func main() {
	ch := sync.NewSeqChan[int](0)

	go func() {
		defer ch.Close()

		ch.Send(slices.Values([]int{10, 20, 30}))
	}()

	for v := range ch.Recv() {
		fmt.Println(v)
	}
}
Output:

10
20
30

func NewSeqChan

func NewSeqChan[T any](capacity int) *SeqChan[T]

NewSeqChan constructs a new SeqChan of type T with the provided capacity.

func (*SeqChan[T]) Recv

func (s *SeqChan[T]) Recv() iter.Seq[T]

RecvSeq reads values from the stored channel and returns as an iterator.

func (*SeqChan[T]) Send

func (s *SeqChan[T]) Send(seq iter.Seq[T])

Send works like Chan.Send but accepts a sequence of values.

func (*SeqChan[T]) TrySend

func (s *SeqChan[T]) TrySend(seq iter.Seq[T]) (sent bool)

TrySend attempts to send a sequence of values on the stored channel.

type WaitGroup

type WaitGroup struct {
	// contains filtered or unexported fields
}

A WaitGroup works like a sync.WaitGroup with a semaphore that limits concurrency.

Example
package main

import (
	"fmt"

	"go.chrisrx.dev/x/sync"
)

func main() {
	var wg sync.WaitGroup
	wg.SetLimit(2)

	for range 5 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println("done")
		}()
	}

	wg.Wait()
}
Output:

done
done
done
done
done

func (*WaitGroup) Add

func (w *WaitGroup) Add(delta int)

Add adds delta, which may be negative, to the WaitGroup counter.

func (*WaitGroup) Done

func (w *WaitGroup) Done()

Done decrements the WaitGroup counter by one.

func (*WaitGroup) SetLimit

func (w *WaitGroup) SetLimit(n int)

SetLimit sets the bounds for concurrency to the provided value.

func (*WaitGroup) TryAdd

func (w *WaitGroup) TryAdd(delta int) bool

TryAdd attempts to add delta to the WaitGroup counter. If a concurrency limit is set and the underlying semaphore cannot be acquired immediately, this returns without adding to the WaitGroup.

func (*WaitGroup) Wait

func (w *WaitGroup) Wait()

Wait blocks until the WaitGroup counter is zero.

type Waiter

type Waiter struct {
	// contains filtered or unexported fields
}

A Waiter uses a LazyChan to wait for an event to occur.

Example
package main

import (
	"fmt"
	"time"

	"go.chrisrx.dev/x/sync"
)

func main() {
	var ready sync.Waiter

	go func() {
		defer ready.Done()
		time.Sleep(100 * time.Millisecond)
	}()

	ready.Wait()
	fmt.Println("done")
}
Output:

done
Example (Reset)
package main

import (
	"fmt"
	"time"

	"go.chrisrx.dev/x/sync"
)

func main() {
	var ready sync.Waiter

	go func() {
		defer ready.Done()
		time.Sleep(100 * time.Millisecond)
	}()

	ready.Wait()
	fmt.Println("done")

	ready.Reset()

	go func() {
		defer ready.Done()
		time.Sleep(100 * time.Millisecond)
	}()

	ready.Wait()
	fmt.Println("done")
}
Output:

done
done

func (*Waiter) C

func (w *Waiter) C() <-chan empty

func (*Waiter) Done

func (w *Waiter) Done()

Done sends a done signal for any waiters currently blocked.

func (*Waiter) Reset

func (w *Waiter) Reset()

Reset resets the waiter to allow reuse. Any waiters currently blocked will release immediately.

func (*Waiter) Wait

func (w *Waiter) Wait()

Wait blocks until receiving a done signal.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL