Documentation
¶
Overview ¶
Discipline used to distribute data items between handlers in quantity corresponding to the priority of the data items.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrDividerBad = errors.New("divider creates an incorrect distribution") ErrDividerEmpty = errors.New("divider was not specified") ErrHandlersQuantityTooSmall = errors.New("quantity of data handlers is too small") ErrHandlersQuantityZero = errors.New("quantity of data handlers is zero") ErrInputEmpty = errors.New("input channel was not specified") ErrInputExists = errors.New("input channel already specified") ErrPriorityZero = errors.New("zero priority is specified") )
Functions ¶
Types ¶
type Discipline ¶
type Discipline[Type any] struct { // contains filtered or unexported fields }
Priority discipline.
Example ¶
package main
import (
"cmp"
"fmt"
"maps"
"os"
"slices"
"strconv"
"time"
"github.com/akramarenkov/flow/priority"
"github.com/akramarenkov/flow/priority/divider"
"github.com/guptarohit/asciigraph"
)
func main() {
handlersQuantity := uint(100)
itemsQuantity := 10000
// Preferably input channels should be buffered for performance reasons.
// Optimal capacity is equal to the quantity of data handlers
inputCapacity := handlersQuantity
processingDuration := 10 * time.Millisecond
graphInterval := 100 * time.Millisecond
graphRangeExtension := 500 * time.Millisecond
inputs := map[uint]chan int{
70: make(chan int, inputCapacity),
20: make(chan int, inputCapacity),
10: make(chan int, inputCapacity),
}
// Used only in this example for measuring receiving of data items
type measure struct {
Priority uint
Time time.Duration
}
compareTime := func(first, second measure) int {
return cmp.Compare(first.Time, second.Time)
}
// Channel capacity is equal to the total quantity of input data in order to
// minimize delays in collecting measurements
measurements := make(chan measure, itemsQuantity*len(inputs))
// For equaling use divider.Fair divider, for prioritization use divider.Rate
// divider or custom divider
opts := priority.Opts[int]{
Divider: divider.Rate,
HandlersQuantity: handlersQuantity,
}
for prio, channel := range inputs {
if err := opts.AddInput(prio, channel); err != nil {
panic(err)
}
}
discipline, err := priority.New(opts)
if err != nil {
panic(err)
}
// Running writers, that write data items to input channels
for _, input := range inputs {
go func() {
defer close(input)
for item := range itemsQuantity {
input <- item
}
}()
}
startedAt := time.Now()
// Running handlers, that process data items
for range handlersQuantity {
go func() {
for prioritized := range discipline.Output() {
// Data item processing
measurement := measure{
Priority: prioritized.Priority,
Time: time.Since(startedAt),
}
time.Sleep(processingDuration)
measurements <- measurement
// Handlers must call this method after the current data item has been
// processed
discipline.Release(prioritized.Priority)
}
}()
}
// Waiting for completion of the discipline, and also writers and handlers
if err := <-discipline.Err(); err != nil {
fmt.Println("An error was received: ", err)
}
graphRange := time.Since(startedAt) + graphRangeExtension
close(measurements)
received := make(map[uint][]measure, len(inputs))
// Receiving measurements
for item := range measurements {
received[item.Priority] = append(received[item.Priority], item)
}
// Sorting measurements by time for further research
for _, measurements := range received {
slices.SortFunc(measurements, compareTime)
}
// Calculating quantity of data items received by handlers over time
quantities := make(map[uint][]float64)
for span := time.Duration(0); span <= graphRange; span += graphInterval {
for prio, measurements := range received {
quantity := float64(0)
for _, measure := range measurements {
if measure.Time < span-graphInterval {
continue
}
if measure.Time >= span {
break
}
quantity++
}
quantities[prio] = append(quantities[prio], quantity)
}
}
// Preparing research data for plot
serieses := make([][]float64, 0, len(quantities))
legends := make([]string, 0, len(quantities))
// To keep the legends in the same order
priorities := slices.SortedFunc(maps.Keys(quantities), priority.Compare)
for _, prio := range priorities {
serieses = append(serieses, quantities[prio])
legends = append(legends, strconv.FormatUint(uint64(prio), 10))
}
graph := asciigraph.PlotMany(
serieses,
asciigraph.Height(10),
asciigraph.Caption("Quantity of data items received by handlers over time"),
asciigraph.SeriesColors(asciigraph.Red, asciigraph.Green, asciigraph.Blue),
asciigraph.SeriesLegends(legends...),
)
_, err = fmt.Fprintln(os.Stderr, graph)
fmt.Println(err)
fmt.Println("See graph")
}
Output: <nil> See graph
func New ¶
func New[Type any](opts Opts[Type]) (*Discipline[Type], error)
Creates and runs discipline.
func (*Discipline[Type]) Err ¶
func (dsc *Discipline[Type]) Err() <-chan error
Returns a channel with errors. If an error occurs (the value from the channel is not equal to nil) the discipline terminates its work.
The single nil value means that the discipline has terminated in normal mode: after closing and emptying all input channels.
The only place where the error can occurs is the divider. If you are sure that the divider is working correctly and the configuration used will not cause an error in it, then you are not obliged to read from this channel and you are not obliged to check the received value.
func (*Discipline[Type]) Output ¶
func (dsc *Discipline[Type]) Output() <-chan types.Prioritized[Type]
Returns output channel.
If this channel is closed, it means that the discipline is terminated.
func (*Discipline[Type]) Release ¶
func (dsc *Discipline[Type]) Release(priority uint)
Marks that current data item has been processed and handler is ready to receive new data item.
Handlers must call this method after the current data item has been processed.
type Opts ¶
type Opts[Type any] struct { // Determines in what quantity data items distributed among data handlers // // For equaling use divider.Fair divider, for prioritization use divider.Rate // divider or custom divider Divider types.Divider // Quantity of data handlers between which data items are distributed HandlersQuantity uint // Input channels of data items. For terminate the discipline it is necessary and // sufficient to close all input channels. Preferably input channels should be // buffered for performance reasons. Optimal capacity is equal to the quantity of // data handlers // // Map key is a value of priority. Zero priority is not allowed Inputs map[uint]<-chan Type }
Options of the created discipline.
Directories
¶
| Path | Synopsis |
|---|---|
|
Here are implemented several dividers that determine in what quantity data items distributed among data handlers.
|
Here are implemented several dividers that determine in what quantity data items distributed among data handlers. |
|
inspect
The package is used to check the dividers for compliance with the requirements for them.
|
The package is used to check the dividers for compliance with the requirements for them. |
|
internal
|
|
|
distrib
Internal package used to calculate distribution parameters.
|
Internal package used to calculate distribution parameters. |
|
measuring
Internal package used to measuring and benchmarking of the discipline.
|
Internal package used to measuring and benchmarking of the discipline. |
|
research
Internal package with research functions that are used for testing.
|
Internal package with research functions that are used for testing. |
|
unmanaged
Internal package with the implementation of a discipline that does not distribute data items between handlers in quantity corresponding to the priority of the data items.
|
Internal package with the implementation of a discipline that does not distribute data items between handlers in quantity corresponding to the priority of the data items. |
|
Simplified version of the priority discipline that runs handlers on its own.
|
Simplified version of the priority discipline that runs handlers on its own. |
|
Data types of the priority discipline.
|
Data types of the priority discipline. |