aboutsummaryrefslogtreecommitdiff
path: root/pipe.go
blob: 08eaa23918e4a1a8359cd61d27dffbed4e5beef8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package pipe

import (
	"sync"
)

var SinkChanError chan error

func init() {
	SinkChanError = make(chan error)
	go func() {
		for range SinkChanError {
		}
	}()
}

// Proc maps cin into a channel by applying proc and outputs to its return.
func Proc[T any, V any](cin <-chan T, count int, proc func(T) (V, error)) <-chan V {
	return ProcWithError(cin, SinkChanError, count, proc)
}

// ProcWithError maps cin into a channel by applying proc and outputs to its return.
func ProcWithError[T any, V any](cin <-chan T, cerr chan<- error, count int, proc func(T) (V, error)) <-chan V {
	cout := make(chan V)
	var wg sync.WaitGroup

	for i := 0; i < count; i++ {
		wg.Add(1)
		go func(<-chan T, chan<- V) {
			for i := range cin {
				v, err := proc(i)
				if err == nil {
					cout <- v
				} else {
					cerr <- err
				}
			}
			wg.Done()
		}(cin, cout)
	}

	go func() {
		wg.Wait()
		close(cout)
	}()

	return cout
}

func noop[T any](_ T) {}

// Wait waits a channel to close, it ends a chain of procs.
func Wait[T any](cin <-chan T) {
	for v := range cin {
		noop(v)
	}
}

// TailProc waits a channel to close by applying proc to it.
// It does not return a channel since its meant to end a chain of goroutine+channel.
func TailProc[T any](cin <-chan T, count int, proc func(T) error) {
	TailProcWithError(cin, SinkChanError, count, proc)
}

// TailProcWithError waits a channel to close by applying proc to it.
// It does not return a channel since its meant to end a chain of goroutine+channel.
func TailProcWithError[T any](cin <-chan T, cerr chan<- error, count int, proc func(T) error) {
	var wg sync.WaitGroup

	for i := 0; i < count; i++ {
		wg.Add(1)
		go func(<-chan T) {
			for i := range cin {
				if err := proc(i); err != nil {
					cerr <- err
				}
			}
			wg.Done()

		}(cin)
	}
	wg.Wait()
}

func TailReduce[T any, V any](cin <-chan T, red func(V, T) (V, error), vinit V) V {
	return TailReduceWithError(cin, SinkChanError, red, vinit)
}

func TailReduceWithError[T any, V any](cin <-chan T, cerr chan<- error, red func(V, T) (V, error), vinit V) V {
	acc := vinit

	for i := range cin {
		if a, err := red(acc, i); err != nil {
			cerr <- err
		} else {
			acc = a
		}
	}

	return acc
}

// Yield returna a channel from a slice.
// It will pipe in into a chan returned by the function.
func Yield[T any](in []T) <-chan T {
	cout := make(chan T)

	go func(chan<- T) {
		for _, a := range in {
			cout <- a
		}
		close(cout)
	}(cout)

	return cout
}

// Map is a classic map.
// It gets vs apply m to it and return its value n times.
func Map[T any, V any](m func(T) V, vs []T) []V {
	result := make([]V, len(vs))

	for i, v := range vs {
		result[i] = m(v)
	}

	return result
}