In this post I m going to show a cool pattern using golang channels: channels of channels.
Channels are a very flexible tool in golang. Learning to use them right, unleash all the power of go.
But this is not an article which explains what a channel is or when you need to use them.
If you want to know more about channels, I suggest reading:
- https://blog.golang.org/share-memory-by-communicating
- https://golang.org/doc/effective_go.html#channels
- https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
- https://dev.to/search?q=go%20channels
A toy problem
Say you need to run a certain task periodically, for example every second.
But from time to time, you need to run it as soon as possible, meaning that you can't wait for the task to be triggered.
A toy example would be a file sync utility, which syncs files between different hosts.
You want to sync the files every x seconds, but occasionally you want to sync on demand; basically forcing the sync outside of the predefined schedule.
Channels of channels
This is one way to go about solving the problem, one that makes use of channels of channels.
const (
running int32 = 1
notRunning int32 = 0
)
type Worker struct {
syncStrategy func() error
period time.Duration
currentStatus *int32
periodicSyncChan chan int
outOfBandSyncChan chan chan error
exitChan chan int
}
func NewSyncWorker(syncStrategy func() error, period time.Duration) *Worker {
periodicSyncChan := make(chan int)
outOfBandSyncChan := make(chan chan error)
exitChan := make(chan int)
var currentStatus = notRunning
return &Worker{
syncStrategy: syncStrategy,
period: period,
currentStatus: ¤tStatus,
periodicSyncChan: periodicSyncChan,
outOfBandSyncChan: outOfBandSyncChan,
exitChan: exitChan,
}
}
func (s *Worker) Sync() error {
if atomic.CompareAndSwapInt32(s.currentStatus, notRunning, running) {
return s.doSync()
} else {
return errors.New("Sync is already running")
}
}
func (s *Worker) doSync() error {
if e := s.syncStrategy(); e != nil {
return e
}
time.AfterFunc(s.period, func() {
s.periodicSyncChan <- 1
})
for true {
select {
case <-s.periodicSyncChan:
{
s.syncStrategy()
time.AfterFunc(s.period, func() {
s.periodicSyncChan <- 1
})
}
case responseChannel := <-s.outOfBandSyncChan:
responseChannel <- s.syncStrategy()
case <-s.exitChan:
{
atomic.CompareAndSwapInt32(s.currentStatus, running, notRunning)
return nil
}
}
}
return nil
}
func (s *Worker) OutOfBandSync() error {
responseChannel := make(chan error)
s.outOfBandSyncChan <- responseChannel
return <-responseChannel
}
func (s *Worker) Stop() {
s.exitChan <- 1
}
func (s *Worker) IsRunning() bool {
return *s.currentStatus == running
}
The outOfBandSyncChan is a channel of channels.
When the client requests an OutOfBandSync, the function creates a responseChannel.
The responseChannel is pushed into the outOfBandSyncChan and it is going to be used to return a value (in this case nil or an error) when the operation is completed.
In other words, we are:
- creating a channel to send back the response to the client
- transfer the newly created channel to the code responsible for the sync through another channel
More importantly, we are not using locks. The different go routines communicates by sending each other messages in a very elegant way.
More golang patterns
If you liked the post, check out:
You can find me on twitter and github
Have fun!
Top comments (0)