Circuit breaker and Hystrix: part three - timeout

In the previous article, we reviewed the max concurrent request number service degradation strategy. But some detailed techniques are not explained very clearly, which will be talked about in this article. And we will analyze timeout strategy as well.

Timeout

Compared with max concurrent request number strategy, timeout is very straightforward to understand.

As we mentioned in the previous article, the core logic of hystrix is inside the GoC function. GoC function internally runs two goroutines. You already see that the first goroutine contains the logic to send request to the target service and the strategy of max concurrent request number. How about the second goroutine? Let’s review it as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
go func() {
timer := time.NewTimer(getSettings(name).Timeout)
defer timer.Stop()

select {
case <-cmd.finished:
// returnOnce has been executed in another goroutine
case <-ctx.Done():
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ctx.Err())
reportAllEvent()
})
return
case <-timer.C:
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrTimeout)
reportAllEvent()
})
return
}
}()

Note that A Timer is created with the timeout duration value from the settings. And a select statement lets this goroutine wait until one case condition receives value from the channel. The timeout case is just the 3nd one (when the first two cases are not triggered), which will run fallback logic with ErrTimeout error message.

So far you should be clear about the main structure and functionalities of these two goroutines. But in detail, there are two Golang techniques need your attention: sync.Once and sync.Cond.

sync.Once

You may already notice the following code block, which is repeated several times inside GoC function.

1
2
3
4
5
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrTimeout) // with various error types
reportAllEvent()
})

returnOnce is type of sync.Once, which makes sure that the callback function of Do method only runs once among different goroutines.

In this specific case, it can guarantee that both returnTicket() and reportAllEvent() execute only once. This really makes sense, because if returnTicket() runs multiple times for one GoC call, then the current concurrent request number will not be correct, right?

I wrote another article about sync.Once in detail, you can refer to that article for more in-depth explanation.

sync.Cond

The implementation of returnTicket function goes as follows:

1
2
3
4
5
6
7
8
9
10
ticketCond := sync.NewCond(cmd)
ticketChecked := false
returnTicket := func() {
cmd.Lock()
for !ticketChecked {
ticketCond.Wait() // hang the current goroutine
}
cmd.circuit.executorPool.Return(cmd.ticket)
cmd.Unlock()
}

ticketCond is a condition variable, and in Golang it is type of sync.Cond.

Condition variable is useful in communication between different goroutines. Concretely, Wait method of sync.Condwill hung the current goroutine, and Signal method will wake up the blocking goroutine to continue executing.

In hystrix case , when ticketChecked is false, which means the current GoC call is not finished and the ticket should not be returned yet. So ticketCond.Wait() is called to block this goroutine and wait until the GoC call is completed which is notified by Signal method.

1
2
ticketChecked = true
ticketCond.Signal()

Note that the above two lines of code are always called together. ticketChecked is set to true means that the current GoC call is finished and the ticket is ready to return. Moreover, the Wait method to hang the goroutine is placed inside a for loop, which is also a best practise technique.

For more explanation about sync.Cond, please refer to my another article.

Fallback

Finally, let’s see how fallback function is called when the target service is not responsive.

Let’s recall that each GoC call will create a new command instance. And fallback function will be assigned to the field with the same name, which will be used later.

1
2
3
4
5
6
7
cmd := &command{
run: run,
fallback: fallback, // fallback logic here
start: time.Now(),
errChan: make(chan error, 1),
finished: make(chan bool, 1),
}

As we see in above sections, errorWithFallback method is triggered when timeout or max concurrent request number threshold is met.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (c *command) errorWithFallback(ctx context.Context, err error) {
eventType := "failure"
if err == ErrCircuitOpen {
eventType = "short-circuit"
} else if err == ErrMaxConcurrency {
eventType = "rejected"
} else if err == ErrTimeout {
eventType = "timeout"
} else if err == context.Canceled {
eventType = "context_canceled"
} else if err == context.DeadlineExceeded {
eventType = "context_deadline_exceeded"
}

c.reportEvent(eventType)
fallbackErr := c.tryFallback(ctx, err)
if fallbackErr != nil {
c.errChan <- fallbackErr
}
}

errorWithFallback method will run the fallback by calling tryFallback and report the metric events such as fallback-failure and fallback-success(will discuss metric collection in next article).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *command) tryFallback(ctx context.Context, err error) error {
if c.fallback == nil {
return err
}
fallbackErr := c.fallback(ctx, err) // execute the fallback logic here
if fallbackErr != nil {
c.reportEvent("fallback-failure")
return fmt.Errorf("fallback failed with '%v'. run error was '%v'", fallbackErr, err)
}

c.reportEvent("fallback-success")

return nil
}

Summary

In this article, we talked about the timeout strategy which is the simplest one among all the strategies provided by hystrix. Some detailed Golang techniques are reviewed as well to have a better understand the complex code logic.

In the next article let’s see how to collect metrics in hystrix to realize the error rate strategy.

Circuit breaker and Hystrix: part two - max concurrent requests

Background

In the second article of this series, I will review the source code of hystrix-go project to understand how to design a circuit breaker and how to implement it with Golang.

If you’re not familiar with circuit breaker pattern or hystrix-go project, please check my previous article about it.

Three service degradation strategies

Hystrix provides three different service degradation strategies to avoid the cascading failure happening in the entire system: timeout, maximum concurrent request numbers and request error rate.

  • timeout: if the service call doesn’t return response successfully within a predefined time duration, then the fallback logic will run. This strategy is the simplest one.
  • maximum concurrent request numbers: when the number of concurrent requests is beyond the threshold, then the fallback logic will handle the following request.
  • request error rate: hystrix will record the response status of each service call, after the error rate reaches the threshold, the breaker will be open, and the fallback logic will execute before the breaker status changes back to closed. error rate strategy is the most complex one.

This can be seen from the basic usage of hystrix as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import (
"github.com/afex/hystrix-go/hystrix"
"time"
)

hystrix.ConfigureCommand("my_command", hystrix.CommandConfig{
Timeout: int(10 * time.Second),
MaxConcurrentRequests: 100,
ErrorPercentThreshold: 25,
})

hystrix.Go("my_command", func() error {
// talk to dependency services
return nil
}, func(err error) error {
// fallback logic when services are down
return nil
})

In the above usage case, you can see that timeout is set to 10 seconds, the maximum request number is 100, and the error rate threshold is 25 percentages.

In the consumer application level, that’s nearly all of the configuration you need to setup. hystrix will make the magin happen internally.

In this series of articles, I plan to show you the internals of hystrix by reviewing the source code.

Let’s start from the easy ones: max concurrent requests and timeout. Then move on to explore the complex strategy request error rate.

GoC

Based on the above example, you can see Go function is the door to the source code of hystrix, so let’s start from it as follows:

1
2
3
4
5
6
7
8
9
10
11
12
func Go(name string, run runFunc, fallback fallbackFunc) chan error {
runC := func(ctx context.Context) error {
return run()
}
var fallbackC fallbackFuncC
if fallback != nil {
fallbackC = func(ctx context.Context, err error) error {
return fallback(err)
}
}
return GoC(context.Background(), name, runC, fallbackC)
}

Go function accept three parameters:

  • name: the command name, which is bound to the circuit created inside hystrix.
  • run: a function contains the normal logic which send request to the dependency service.
  • fallback: a function contains the fallback logic.

Go function just wraps run and fallback with Context, which is used to control and cancel goroutine, if you’re not familiar with it then refer to my previous article. Finally it will call GoC function.

GoC function goes as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
// construct a new command instance
cmd := &command{
run: run,
fallback: fallback,
start: time.Now(),
errChan: make(chan error, 1),
finished: make(chan bool, 1),
}
// get circuit by command name
circuit, _, err := GetCircuit(name)
if err != nil {
cmd.errChan <- err
return cmd.errChan
}
cmd.circuit = circuit
//declare a condition variable sync.Cond: ticketCond, to synchronize among goroutines
//declare a flag variable: ticketChecked, work together with ticketCond
ticketCond := sync.NewCond(cmd)
ticketChecked := false
// declare a function: returnTicket, will execute when a concurrent request is done to return `ticket`
returnTicket := func() {
cmd.Lock()
for !ticketChecked {
ticketCond.Wait()
}
cmd.circuit.executorPool.Return(cmd.ticket)
cmd.Unlock()
}
// declare a sync.Once instance: returnOnce, make sure the returnTicket function execute only once
returnOnce := &sync.Once{}

// declare another function: reportAllEvent, used to collect the metrics
reportAllEvent := func() {
err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
if err != nil {
log.Printf(err.Error())
}
}
// launch a goroutine which executes the `run` logic
go func() {
defer func() { cmd.finished <- true }()

if !cmd.circuit.AllowRequest() {
cmd.Lock()
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrCircuitOpen)
reportAllEvent()
})
return
}

cmd.Lock()
select {
case cmd.ticket = <-circuit.executorPool.Tickets:
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
default:
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrMaxConcurrency)
reportAllEvent()
})
return
}

runStart := time.Now()
runErr := run(ctx)
returnOnce.Do(func() {
defer reportAllEvent()
cmd.runDuration = time.Since(runStart)
returnTicket()
if runErr != nil {
cmd.errorWithFallback(ctx, runErr)
return
}
cmd.reportEvent("success")
})
}()
// launch the second goroutine for timeout strategy
go func() {
timer := time.NewTimer(getSettings(name).Timeout)
defer timer.Stop()

select {
case <-cmd.finished:
case <-ctx.Done():
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ctx.Err())
reportAllEvent()
})
return
case <-timer.C:
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrTimeout)
reportAllEvent()
})
return
}
}()

return cmd.errChan
}

I admit it’s complex, but it’s also the core of the entire hystrix project. Be patient, let’s review it bit by bit carefully.

First of all, the code structure of GoC function is as follows:

GoC

  1. Construct a new Command object, which contains all the information for each call to GoC function.
  2. Get the circuit breaker by name (create it if it doesn’t exist) by calling GetCircuit(name) function.
  3. Declare condition variable ticketCond and ticketChecked with sync.Cond which is used to communicate between goroutines.
  4. Declare function returnTicket. What is a ticket? What does it mean by returnTicket? Let’s discuss it in detail later.
  5. Declare another function reportAllEvent. This function is critical to error rate strategy, and we can leave it for detailed review in the following articles.
  6. Declare an instance of sync.Once, which is another interesting synchronization primitives provided by golang.
  7. Launch two goroutines, each of which contains many logics too. Simply speaking, the first one contains the logic of sending requests to the target service and the strategy of max concurrent request number, and the second one contains the timeout strategy.
  8. Return a channel type value

Let’s review each of them one by one.

command

command struct goes as follows, which embeds sync.Mutex and defines several fields:

1
2
3
4
5
6
7
8
9
10
11
12
13
type command struct {
sync.Mutex

ticket *struct{}
start time.Time
errChan chan error
finished chan bool
circuit *CircuitBreaker
run runFuncC
fallback fallbackFuncC
runDuration time.Duration
events []string
}

Note that command object iteself doesn’t contain command name information, and its lifecycle is just inside the scope of one GoC call. It means that the statistic metrics about the service request like error rate and concurrent request number are not stored inside command object. Instead, such metrics are stored inside circuit field which is CircuitBreaker type.

CircuitBreaker

As we mentioned in the workflow of GoC function, GetCircuit(name) is called to get or create the circuit breaker. It is implemented inside circuit.go file as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func init() {
circuitBreakersMutex = &sync.RWMutex{}
circuitBreakers = make(map[string]*CircuitBreaker)
}

func GetCircuit(name string) (*CircuitBreaker, bool, error) {
circuitBreakersMutex.RLock()
_, ok := circuitBreakers[name]
if !ok {
circuitBreakersMutex.RUnlock()
circuitBreakersMutex.Lock()
defer circuitBreakersMutex.Unlock()

if cb, ok := circuitBreakers[name]; ok {
return cb, false, nil
}
circuitBreakers[name] = newCircuitBreaker(name)
} else {
defer circuitBreakersMutex.RUnlock()
}

return circuitBreakers[name], !ok, nil
}

The logic is very straightforward. All the circuit breakers are stored in a map object circuitBreakers with the command name as the key.

The newCircuitBreaker constructor function and CircuitBreaker struct are as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type CircuitBreaker struct {
Name string
open bool
forceOpen bool
mutex *sync.RWMutex
openedOrLastTestedTime int64

executorPool *executorPool // used in the strategy of max concurrent request number
metrics *metricExchange // used in the strategy of request error rate
}

func newCircuitBreaker(name string) *CircuitBreaker {
c := &CircuitBreaker{}
c.Name = name
c.metrics = newMetricExchange(name)
c.executorPool = newExecutorPool(name)
c.mutex = &sync.RWMutex{}

return c
}

All the fields of CircuitBreaker are important to understand how the breaker works.

There are two fields that are not simple type need more analysis, include executorPool and metrics.

  • executorPool: used for max concurrent request number strategy, which is just this article’s topic.
  • metrics: used for request error rate strategy, which will be discussed in the next article, all right?

executorPool

We can find executorPool logics inside the pool.go file:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type executorPool struct {
Name string
Metrics *poolMetrics
Max int
Tickets chan *struct{} // Tickets channel
}

func newExecutorPool(name string) *executorPool {
p := &executorPool{}
p.Name = name
p.Metrics = newPoolMetrics(name)
p.Max = getSettings(name).MaxConcurrentRequests

p.Tickets = make(chan *struct{}, p.Max)
// send Max numbers of value into the Tickets channel
for i := 0; i < p.Max; i++ {
p.Tickets <- &struct{}{}
}

return p
}

It makes use of golang channel to realize max concurrent request number strategy. Note that Tickets field, which is a buffered channel with capicity of MaxConcurrentRequests is created. And in the following for loop, make the buffered channel full by sending value into the channel until reaching the capacity.

As we have shown above, in the first goroutine of GoC function, the Tickets channel is used as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
go func() {
...
select {
case cmd.ticket = <-circuit.executorPool.Tickets: // receive ticket from Tickets channel
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
default:
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrMaxConcurrency) // run fallback logic when concurrent requests reach threshold
reportAllEvent()
})
return
}
...
}()

Each call to GoC function will get a ticket from circuit.executorPool.Tickets channel until no ticket is left, which means the number of concurrent requests reaches the threshold. In that case, the default case will execute , and the service will be gracefully degraded with fallback logic.

On the other side, after each call to GoC is done, the ticket need to be sent back to the circuit.executorPool.Tickets, right? Do you remember the returnTicket function mentioned in above section. Yes, it is just used for this purpose. The returnTicket function defined in GoC function goes as follows:

1
2
3
4
5
6
7
8
returnTicket := func() {
cmd.Lock()
for !ticketChecked {
ticketCond.Wait()
}
cmd.circuit.executorPool.Return(cmd.ticket) // return ticket to the executorPool
cmd.Unlock()
}

It calls executorPool.Return function:

1
2
3
4
5
6
7
8
9
10
11
// Return function in pool.go file
func (p *executorPool) Return(ticket *struct{}) {
if ticket == nil {
return
}

p.Metrics.Updates <- poolMetricsUpdate{
activeCount: p.ActiveCount(),
}
p.Tickets <- ticket // send ticket back to Tickets channel
}

The design and implementation of Tickets is a great example of golang channel in the real-world application.

In summary, the max concurrent request number strategy can be illustrated as follows:

Summary

In this article, max concurrent requests strategy in hystrix is reviewed carefully, and I hope you can learn something interesting from it.

But I didn’t cover the detailed logics inside GoC function, including sync.Cond, sync.Once and fallback logics. Let’s review them and timeout strategy together in the next article.

Circuit breaker and Hystrix: part one - introduction

In this series of articles, I want to talk about circuit breaker pattern based on an popular open source project hystrix (in fact, I will take a look at the golang version hystrix-go, instead of the original version which is written in Java).

As the first article of this series, I will give a general introduction to circuit breaker, let you know what it is and why it is important. Moreover, let’s review the background about the project hystrix-go and hystrix, and understand the basic usage with a small demo example.

Circuit breaker

Software in distributed architectures generally have many dependencies, and the failure at some point for each dependency(even the most reliable service) is inevitable.

What happens if our failing service becomes unresponsive? All services that rely on it have risks to become unresponsive, too. This is called catastrophic cascading failure.

The basic idea behind the circuit breaker is very simple. A circuit breaker works by wrapping calls to a target service and keeps monitoring the failure rates. Once the failures reach a certain threshold, the circuit breaker will trip ,and all the further calls to the circuit return with a fault or error.

The design philosophy behind the circuit breaker pattern is fail fast: when a service becomes unresponsive, other services relying on it should stop waiting for it and start dealing with the fact that the failing service may be unavailable. By preventing a single service’s failure cascading through the entire system, the circuit breaker pattern contributes to the stability and resilience of the whole system.

The circuit breaker pattern can be implemented as a finite-state machine shown below:

circuit-breaker

There are three statuses: open, closed and half-open

  • closed: Requests are passed to the target service. Keep monitoring the metrics like error rate, request numbers and timeout. When these metrics exceed a specific threshold(which is set by the developer), the breaker is tripped and transitions into open status.
  • open: Requests are not passed to the target service, instead the fallback logic(which is defined by developer as well) will be called to handle the failure. The breaker will stay open status for a period of time called sleeping window, after which the breaker can transition from open to half-open.
  • half-open: In this status, a limited number of requests are passed to the target service, which is aims at resetting the status. If the target service can response successfully then the break is reset back to closed status. Or else the breaker transitions back to open status.

That’s basic background about circuit breaker, you can find much more information about it on line.

Next, let’s investigate the project hystrix.

hystrix

hystrix is a very popular open source project. You can find everything about it in this link.

I want to quote several important points from the above link. Hystrix is designed to do the following:

  • Give protection from and control over latency and failure from dependencies accessed (typically over the network) via third-party client libraries.
  • Stop cascading failures in a complex distributed system.
  • Fail fast and rapidly recover.
  • Fallback and gracefully degrade when possible.
  • Enable near real-time monitoring, alerting, and operational control.

You can see hystrix perfectly implements the idea of circuit breaker pattern we talked about in the last section, right?

The hystrix project is developed with Java. In this sereis of articles I prefer to use a golang version hystrix-go, which is a simplified version but implements all the main designs and ideas about circuit breaker.

For the usage of hystrix-go, you can find it in this link, which is very straightforward to understand. And you can easily find many other articles online with demo examples to show more usage level stuff. Please go head to read.

In my articles, I want to go into the source code of hystrix-go and have an advanced investigation about how circuit breaker is implemented. Please follow up to read the next articles in this series.

Summary

In this article, I talked about the background of circuit breaker pattern and the basic information of the popular open-source project in this field hystrix-go. Next step, we will take an in-depth look at the source code of this project.

Golang synchronization primitives source code analysis: part one - sync.Once

Background

In the following series of posts, I will take an in-depth look at the synchronization primitives provided by Golang.

Although the recommended synchronization mechanism in Golang is channel, there are several powerful synchronization primitives provided in Golang sync package. Based on the official document, Other than the Once and WaitGroup types, most are intended for use by low-level library routines. If you read the code of low-level open source projects or the standard packages, you will see synchronization primitives in sync package frequently.

As the first post in this series, let’s check the source code of sync.Once, which is also the simplest one.

sync.Once

If you have several logics running in various go-routines, and you want only one of the logics will execute finally. For this kind of scenario, sync.Once is a perfect option for you.

Let’s review the source code of sync.Once defined inside the once.go file:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package sync

import (
"sync/atomic"
)

type Once struct {
done uint32
m Mutex
}

func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}

func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}

Struct Once has a status flag done whose value is 0 when initialized. Wrap the logic you want to execute in a function f, and pass this function f to the Do() method. When Do is called for the first time, the logic in f executes after that done flag is set to 1, other calls to Do don’t execute f.

One misleading point is If once.Do(f) is called multiple times, only the first call will invoke f, even if f has a different value in each invocation. Check the following example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"sync"
"time"
)

var doOnce sync.Once

func main() {
go TaskOne()
go TaskTwo()
time.Sleep(time.Second)
}

func PrintTask(id int) {
fmt.Printf("Task %d, Run once\n", id)
}

func TaskOne() {
doOnce.Do(func() {
PrintTask(1)
})
fmt.Println("Run this every time")
}

func TaskTwo() {
doOnce.Do(func() {
PrintTask(2)
})
fmt.Println("Run this every time")
}

Even Do is called twice with different f logic, but only the first call is invoked since they are bound to the same instance of Once.

fast path and slow path

As you saw above, the implementation of sync.Once is not complex. But one question comes to my mind when I double check the code. Why do we need split the logics into two functions Do and doSlow? Why the second function name is doSlow, and what does slow mean here?

Do you have similar questions?

I found the answer in the comments of once.go file.

1
2
3
4
if atomic.LoadUint32(&o.done) == 0 {
// Outlined slow-path to allow inlining of the fast-path.
o.doSlow(f)
}

Note that it mentioned two words: fast path and slow path.

  • Fast path is a term used in computer science to describe a path with shorter instruction path length through a program compared to the ‘normal’ path. For a fast path to be effective it must handle the most commonly occurring tasks more efficiently than the ‘normal’ path, leaving the latter to handle uncommon cases, corner cases, error handling, and other anomalies. Fast paths are a form of optimization.

In the Once case, since the first call to Do function will set done to 1, so the most common case or status for Once is the done flag equals to 1. The fast path in Do function is just for this common case. While the done flag equals to initial status 0 can be regarded as uncommon case, which is specially handled in the doSlow function. The performance can be optimized in this way.

hot path

Another very interesting concept worth mentioning is hot path, and it occurs in the Once struct design.

1
2
3
4
5
6
7
8
9
type Once struct {
// done indicates whether the action has been performed.
// It is first in the struct because it is used in the hot path.
// The hot path is inlined at every call site.
// Placing done first allows more compact instructions on some architectures (amd64/x86),
// and fewer instructions (to calculate offset) on other architectures.
done uint32
m Mutex
}

At first glance, it’s just plain and ordinary struct, but the comments emphasize that done is first in the struct because it is used in the hot path. It means that done is defined as the first field in Once struct on purpose. And the purpose is also described in the comment Placing done first allows more compact instructions on some architectures (amd64/x86), and fewer instructions (to calculate offset) on other architectures.

What does that mean? I found a great answer in this post. The conclusion is:

  • A hot path is a sequence of instructions executed very frequently.

  • When accessing the first field of a structure, we can directly dereference the pointer to the structure to access the first field. To access other fields, we need to provide an offset from the first value in addition to the struct pointer.

  • In machine code, this offset is an additional value to pass with the instruction which makes it longer. The performance impact is that the CPU must perform an addition of the offset to the struct pointer to get the address of the value to access.

  • Thus machine code to access the first field of a struct is more compact and faster.

Simply speaking, accessing the first field of a struct is faster since the CPU doesn’t need to compute the memory offset!

This is really a good lesson to show the high-level code you programmed can have such a big difference in the bottom level.

Golang Context package source code analysis: part 2

Background

In the last post, I shared the first part about the context package: valueCtx and cancelCtx. Let us continue the journey to discover more in this post.

WithTimeout and WithDeadline

As usual, let us start with an example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"context"
"fmt"
"time"
)

func main() {
cancelCtx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
go task(cancelCtx)
time.Sleep(time.Second * 4)
}

func task(ctx context.Context) {
i := 1
for {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
default:
fmt.Println(i)
time.Sleep(time.Second * 1)
i++
}
}
}

Since we already know the behavior of cancelCtx, it’s quite straightforward to understand how WithTimeout works. It accepts a timeout duration after which the done channel will be closed and context will be canceled. And a cancel function will be returned as well, which can be called in case the context needs to be canceled before timeout.

WithDeadline usage is quite similar to WithTimeout, you can find related example easily. Let us review the source code:

1
2
3
4
5
6
7
8
9
type timerCtx struct {
cancelCtx
timer *time.Timer
deadline time.Time
}

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}

Since WithTimeout and WithDeadline have many common points between them, so they share the same type of context: timerCtx, which embeds cancelCtx and defines two more properties: timer and deadline.

Let us review what happens when we create a timerCtx:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
// Get deadline time of parent context.
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil { // 'err' field of the embedded cancelCtx is promoted
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}

Compared to WithCancle and WithValue, WithDeadline is more complex, let us go through bit by bit.

Firstly, parent.Deadline will get the deadline time for parent context. The Deadline method signature was defined in the Context interface as below:

1
2
3
4
type Context interface {
Deadline() (deadline time.Time, ok bool)
...
}

In the context package, only emptyCtx and timerCtx type implement this method:

1
2
3
4
5
6
7
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}

So when we call parent.Deadline(), if the parent context is also type of timerCtx which implements its own Deadline() method, then you can get the deadline time of the parent context. Otherwise if the parent context is type of cancelCtx or valueCtx, then finally the Deadline() method of emptyCtx will be called and you will get the zero value of type time.Time and bool (if you have interest, you can verify by yourself the zero value: 0001-01-01 00:00:00 +0000 UTC and false).

If parent’s deadline is earlier than the passed in deadline parameter, then directly return a cancelCtx by calling WithCancel(parent). Of course when the passed in deadline is reasonable, we need to create a timerCtx:

1
2
3
4
5
6
7
8
//inside WithDeadline() function
...
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
...

In the above code, you see propagateCancel method again, I have discussed about it in the last post, if you don’t understand it, please refer here.

Similar to cancelCtx, timerCtx sends the context cancel signal by closing the done channel by calling its own cancel method. There two scenarios when cancelling the context:

  • timeout cancel: when the deadline exceeded, automatically close the done channel;
1
2
3
4
5
6
7
// inside WithDeadline function
...
// timeout cancel
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
...
  • manual cancel: call the returned cancel function to close the done channel before the deadline;
    1
    2
    3
    4
    5
    // inside WithDeadline function
    ...
    // return the cancel function as the second return value
    return c, func() { c.cancel(true, Canceled) }
    ...

Both scenarios call cancel method:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err) // close the done channel and set err field
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
// Note: timerCtx c's parent is c.cancelCtx.Context
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
// stop and clean the timer
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}

timerCtx implements cancel method to stop and reset the timer then delegate to cancelCtx.cancel.

Summary

In the second part of this post series, we discussed how timeout and deadline context are implemented in the source code level. In this part, Golang struct embedding technique is used a lot, you can compare it with traditional OOP solution to have a deep understanding.

Golang Context package source code analysis: part 1

Background

As a Golang user and learner, I always think Golang standard package is a great learning resource, which can provide best practices for both the language itself and various software or programming concepts.

In this post, I will share what I learned about package context.

context is widely used in the Golang ecosystem, and I bet you must often come across it. Many standard packages rely on it.

There are many good articles online explaining the background and usage examples of context, I will not spend too much time on that, just add a brief introduction here.

The problems context plans to solve are:

  • Let’s say that you started a function and you need to pass some common parameters to the downstream functions. You cannot pass these common parameters each as an argument to all the downstream functions.
  • You started a goroutine which in turn start more goroutines and so on. Suppose the task that you were doing is no longer needed. Then how to inform all child goroutines to gracefully exit so that resources can be freed up
  • A task should be finished within a specified timeout of say 2 seconds. If not it should gracefully exit or return.
  • A task should be finished within a deadline eg it should end before 5 pm . If not finished then it should gracefully exit and return

You can refer to this slide from the author of context package to understand more about the background.

In this post, I will show you the details of context package source code. You can find all the related source code inside the context.go file. You will notice that context package content is not long, and there are roughly 500 lines of code. Moreover, there are many comments, so the actual code is only half. These 200+ lines of code are a great piece of learning resource in my eyes.

Source code analysis

Context interface and emptyCtx

The most basic data structure of context is the Context interface as below:

1
2
3
4
5
6
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}

Context is just an interface, which is very hard to imagine how to use it. So let us continue reviewing some types implement such interface.

When context is used, generally speaking, the first step is creating the root context with context.Background() function(the contexts are chained together one by one and form a tree structure, and the root context is the first one in the chain). Let’s check what it is:

1
2
3
4
5
var background = new(emptyCtx)

func Background() Context {
return background
}

Background function return the background which is a global variable declared as new(emptyCtx). So what is emptyCtx, let continue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}

func (*emptyCtx) Done() <-chan struct{} {
return nil
}

func (*emptyCtx) Err() error {
return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}

func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}

You can see that emptyCtx is declared as a new customized type based on int. In fact, it’s not important that emptyCtx is based on int, string or whatever. The important thing is all the four methods defined in interface Context return nil. So the root context is never canceled, has no values, and has no deadline.

Let’s continue to review other data types.

valueCtx and WithValue

As mentioned above, one typical usage of context is passing data. In this case, you need to create a valueCtx with WithValue function. For example, the following example:

1
2
3
rootCtx := context.Background()

childCtx := context.WithValue(rootCtx, "msgId", "someMsgId")

WithValue is a function has only one return value:

1
2
3
4
5
6
7
8
9
10
11
12
func WithValue(parent Context, key, val interface{}) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}

Please ignore the reflectlite part, I will give a in-depth discussion about it in another post. In this post, we only need to care the return value type is &valueCtx:

1
2
3
4
type valueCtx struct {
Context
key, val interface{}
}

There is one interesting Golang language feature here: embedding, which realizes composition. In this case, valueCtx has all the four methods defined in Context.
In fact, embedding is worthy much more discussion. Simplying speaking, there are 3 types of embedding: struct in struct, interface in interface and interface in struct. valueCtx is the last type, you can refer to this great post

When you want to get the value out, you can use the Value method:

1
2
3
4
5
6
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}

If the provided key parameter does not match the current context’s key, then the parent context’s Value method will be called. If we still can’t find the key, the parent context’s will call its parent as well. The search will pass along the chain until the root node which will return nil as we mentioned above:

1
2
3
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}

Next, let’s review another interesting type: cancelCtx

cancelCtx and WithCancel

First, let’s see how to use cancelCtx and WithCanel with a simple example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"context"
"fmt"
"time"
)

func main() {
cancelCtx, cancelFunc := context.WithCancel(context.Background())
go task(cancelCtx)
time.Sleep(time.Second * 3)
cancelFunc()
time.Sleep(time.Second * 3)
}

func task(ctx context.Context) {
i := 1
for {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
default:
fmt.Println(i)
time.Sleep(time.Second * 1)
i++
}
}
}

When main goroutine wants to cancel task goroutine, it can just call cancelFunc. Then the task goroutine will exit and stop running. In this way, goroutine management will be easy task. Let’s review the code:

1
2
3
4
5
6
7
8
9
10
type CancelFunc func()

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}

cancelCtx is complex, let’s go through bit by bit.

WithCancel returns two values, the first one &c is type cancelCtx which is created with newCancelCtx, the second one func() { c.cancel(true, Canceled) } is type CancenlFunc(just a general function).

Let’s review cancelCtx firstly:

1
2
3
4
5
6
7
8
9
10
11
12
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}

type cancelCtx struct {
Context

mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}

Context is embedded inside cancelCtx as well. Also it defines several other fields. Let’s see how it works by checking the receiver methods:

1
2
3
4
5
6
7
8
9
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}

Done method returns channel done. In the above demo, task goroutine listen for cancel signal from this done channel like this:

1
2
3
4
5
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
...

The signal is trigger by calling the cancle function, so let’s review what happens inside it and how the signals are sent to the channel. All the logic is inside cancel method of cancelCtx:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
// set the err property when cancel is called for the first time
c.err = err
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()

if removeFromParent {
removeChild(c.Context, c)
}
}

As shown above, cancelCtx has four properties, we can understand their purpose clearly in this cancel:

  • mu: a general lock to make sure goroutine safe and avoid race condition;
  • err: a flag representing whether the cancelCtx is cancelled or not. When the cancelCtx is created, err value is nil. When cancel is called for the first time, it will be set by c.err = err;
  • done: a channel which sends cancel signal. To realize this, context just close the done channel instead of send data into it. This is an interesting point which is different from my initial imagination before I review the source code. Yes, after a channel is closed, the receiver can still get zero value from the closed channel based on the channel type. Context just make use of this feature.
  • children: a Map containing all its child contexts. When current context is cancelled, the cancel action will be propogated to the children by calling child.cancel(false, err) in the for loop. Then next question is when the parent-child relationship is established? The secret is inside the propagateCancel() function;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil {
return // parent is never canceled
}

select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}

if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}

propagateCancel contains many logics, and some of them can’t be understood easily, I will write another post for those parts. But in this post, we only need to understand how to establish the relationship between parent and child for genernal cases.

The key point is function parentCancelCtx, which is used to find the innermost cancellable ancestor context:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
done := parent.Done()
if done == closedchan || done == nil {
return nil, false
}
// Value() will propagate to the root context
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
if !ok {
return nil, false
}
p.mu.Lock()
ok = p.done == done
p.mu.Unlock()
if !ok {
return nil, false
}
return p, true
}

You can notice that Value method is called, since we analyzed in the above section, Value will pass the search until the root context. Great.

Back to the propagateCancel function, if cancellable ancestor context is found, then current context is added into the children hash map as below:

1
2
3
4
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}

The relationship is established.

Summary

In this article, we review the source code of Context package and understand how Context, valueCtx and cancelCtx works.

Context contains the other two types of context: timeOut context and deadLine context, Let’s work on that in the second part of this post series.

Golang bytes.Buffer and bufio

Background

In this post, I will show you the usage and implementation of two Golang standard packages’ : bytes (especially bytes.Buffer) and bufio.

These two packages are widely used in the Golang ecosystem especially works related to networking, files and other IO tasks.

Demo application

One good way to learn new programming knowledge is checking how to use it in real-world applications. The following great demo application is from the open source book Network Programming with Go by Jan Newmarch.

For your convenience, I paste the code here. This demo consists of two parts: client side and server side, which together form a simple directory browsing protocol. The client would be at the user end, talking to a server somewhere else. The client sends commands to the server side that allows you to list files in a directory and print the directory on the server.

First is the client side program:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package main

import (
"fmt"
"net"
"os"
"bufio"
"strings"
"bytes"
)

// strings used by the user interface
const (
uiDir = "dir"
uiCd = "cd"
uiPwd = "pwd"
uiQuit = "quit"
)

// strings used across the network
const (
DIR = "DIR"
CD = "CD"
PWD = "PWD"
)

func main() {
if len(os.Args) != 2 {
fmt.Println("Usage: ", os.Args[0], "host")
os.Exit(1)
}

host := os.Args[1]

conn, err := net.Dial("tcp", host+":1202")
checkError(err)

reader := bufio.NewReader(os.Stdin)
for {
line, err := reader.ReadString('\n')
// lose trailing whitespace
line = strings.TrimRight(line, " \t\r\n")
if err != nil {
break
}

// split into command + arg
strs := strings.SplitN(line, " ", 2)
// decode user request
switch strs[0] {
case uiDir:
dirRequest(conn)
case uiCd:
if len(strs) != 2 {
fmt.Println("cd <dir>")
continue
}
fmt.Println("CD \"", strs[1], "\"")
cdRequest(conn, strs[1])
case uiPwd:
pwdRequest(conn)
case uiQuit:
conn.Close()
os.Exit(0)
default:
fmt.Println("Unknown command")
}
}
}

func dirRequest(conn net.Conn) {
conn.Write([]byte(DIR + " "))

var buf [512]byte
result := bytes.NewBuffer(nil)
for {
// read till we hit a blank line
n, _ := conn.Read(buf[0:])
result.Write(buf[0:n])
length := result.Len()
contents := result.Bytes()
if string(contents[length-4:]) == "\r\n\r\n" {
fmt.Println(string(contents[0 : length-4]))
return
}
}
}

func cdRequest(conn net.Conn, dir string) {
conn.Write([]byte(CD + " " + dir))
var response [512]byte
n, _ := conn.Read(response[0:])
s := string(response[0:n])
if s != "OK" {
fmt.Println("Failed to change dir")
}
}

func pwdRequest(conn net.Conn) {
conn.Write([]byte(PWD))
var response [512]byte
n, _ := conn.Read(response[0:])
s := string(response[0:n])
fmt.Println("Current dir \"" + s + "\"")
}

func checkError(err error) {
if err != nil {
fmt.Println("Fatal error ", err.Error())
os.Exit(1)
}
}
client.go

Then is server side code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package main

import (
"fmt"
"net"
"os"
)

const (
DIR = "DIR"
CD = "CD"
PWD = "PWD"
)

func main() {

service := "0.0.0.0:1202"
tcpAddr, err := net.ResolveTCPAddr("tcp", service)
checkError(err)

listener, err := net.ListenTCP("tcp", tcpAddr)
checkError(err)

for {
conn, err := listener.Accept()
if err != nil {
continue
}
go handleClient(conn)
}
}

func handleClient(conn net.Conn) {
defer conn.Close()

var buf [512]byte
for {
n, err := conn.Read(buf[0:])
if err != nil {
conn.Close()
return
}

s := string(buf[0:n])
// decode request
if s[0:2] == CD {
chdir(conn, s[3:])
} else if s[0:3] == DIR {
dirList(conn)
} else if s[0:3] == PWD {
pwd(conn)
}

}
}

func chdir(conn net.Conn, s string) {
if os.Chdir(s) == nil {
conn.Write([]byte("OK"))
} else {
conn.Write([]byte("ERROR"))
}
}

func pwd(conn net.Conn) {
s, err := os.Getwd()
if err != nil {
conn.Write([]byte(""))
return
}
conn.Write([]byte(s))
}

func dirList(conn net.Conn) {
defer conn.Write([]byte("\r\n"))

dir, err := os.Open(".")
if err != nil {
return
}

names, err := dir.Readdirnames(-1)
if err != nil {
return
}
for _, nm := range names {
conn.Write([]byte(nm + "\r\n"))
}
}

func checkError(err error) {
if err != nil {
fmt.Println("Fatal error ", err.Error())
os.Exit(1)
}
}
server.go

Bytes.Buffer

Based on the above demo, let’s review how Bytes.Buffer is used.

According to Go official document:

Package bytes implements functions for the manipulation of byte slices.
A Buffer is a variable-sized buffer of bytes with Read and Write methods.

The bytes package itself is easy to understand, which provides functionalities to manipulate byte slice. The concern is bytes.Buffer, what benefits can we get by using it? Let’s review the demo code where it is used.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func dirRequest(conn net.Conn) {
conn.Write([]byte(DIR + " "))

var buf [512]byte
result := bytes.NewBuffer(nil)
for {
// read till we hit a blank line
n, _ := conn.Read(buf[0:])
result.Write(buf[0:n])
length := result.Len()
contents := result.Bytes()
if string(contents[length-4:]) == "\r\n\r\n" {
fmt.Println(string(contents[0 : length-4]))
return
}
}
}

The above code block is from client.go part. And the scenario is: the client send DIR command to server side, server run this DIR command which will return contents of current directory. Client and server use conn.Read and conn.Write to communicate with each other. The client keeps reading data in a for loop until all the data is consumed which is marked by two continuous \r\n strings.

In this case, a new bytes.Buffer object is created by calling NewBuffer method and three other member methods are called: Write, Len and Bytes. Let’s review their source code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Buffer struct {
buf []byte
off int
lastRead readOp
}

func (b *Buffer) Write(p []byte) (n int, err error) {
b.lastRead = opInvalid
m, ok := b.tryGrowByReslice(len(p))
if !ok {
m = b.grow(len(p))
}
return copy(b.buf[m:], p), nil
}

func (b *Buffer) Len() int {
return len(b.buf) - b.off
}

func (b *Buffer) Bytes() []byte {
return b.buf[b.off:]
}

The implementation is easy to understand and no need to add more explanation. One interesting point is inside the Write function. It will first check whether the buffer has enough room for new bytes, if no then it will call internal grow method to add more space.

In fact, this is the biggest benefit you can get from Buffer. You don’t need to manage the dynamic change of buffer length manually, bytes.Buffer will help you to do that. In this way you won’t waste memory by setting the possible maximum length just for providing enough space. To some extend, it is similar to the vector in C++ language.

Bufio

Next, let’s review how Bufio pacakge works. In our demo, it is used as following:

1
2
3
4
5
6
reader := bufio.NewReader(os.Stdin)

for {
line, err := reader.ReadString('\n')
// hide other code below
}

Before we dive into the details about the demo code, let’s first understand what is the purpose of bufio package.

First we need to understand that when applications run IO operations like read or write data from or to files, network and database. It will trigger system call in the bottom level, which is heavy in the performance point of view.

Buffer IO is a technique used to temporarily accumulate the results for an IO operation before transmitting it forward. This technique can increase the speed of a program by reducing the number of system calls. For example, in case you want to read data from disk byte by byte. Instead of directly reading each byte from the disk every time, with buffer IO technique, we can read a block of data into buffer once, then consumers can read data from the buffer in whatever way you want. Performance will be improved by reducing heavy system calls.

Concretely, let’s review how bufio package do this. The Go official document goes like this:

Package bufio implements buffered I/O. It wraps an io.Reader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.

Let’s understand the definition by reading the source code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// NewReader and NewReaderSize in bufio.go
func NewReader(rd io.Reader) *Reader {
return NewReaderSize(rd, defaultBufSize)
}

func NewReaderSize(rd io.Reader, size int) *Reader {
b, ok := rd.(*Reader)
if ok && len(b.buf) >= size {
return b
}
if size < minReadBufferSize {
size = minReadBufferSize
}
r := new(Reader)
r.reset(make([]byte, size), rd)
return r
}

In our demo, we use NewReader which then calls NewReaderSize to create a new Reader instance. One thing need to notice is that the parameter is io.Reader type, which is an important interface implements only one method Read.

1
2
3
4
// the Reader interface in io.go file
type Reader interface {
Read(p []byte) (n int, err error)
}

In our case, we use os.Stdin as the function argument, which will read data from standard input.

Then let’s reivew declaration of bufio.Reader which wraps io.Reader:

1
2
3
4
5
6
7
8
9
// Reader implements buffering for an io.Reader object.
type Reader struct {
buf []byte
rd io.Reader // reader provided by the client
r, w int // buf read and write positions
err error
lastByte int // last byte read for UnreadByte; -1 means invalid
lastRuneSize int // size of last rune read for UnreadRune; -1 means invalid
}

bufio.Reader has many methods defined, in our case we use ReadString, which will call another low-level method ReadSlice.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (b *Reader) ReadSlice(delim byte) (line []byte, err error) {
s := 0
for {
// Search buffer.
if i := bytes.IndexByte(b.buf[b.r+s:b.w], delim); i >= 0 {
i += s
line = b.buf[b.r : b.r+i+1]
b.r += i + 1
break
}

if b.err != nil {
line = b.buf[b.r:b.w]
b.r = b.w
err = b.readErr()
break
}

if b.Buffered() >= len(b.buf) {
b.r = b.w
line = b.buf
err = ErrBufferFull
break
}

s = b.w - b.r

b.fill() // buffer is not full
}

if i := len(line) - 1; i >= 0 {
b.lastByte = int(line[i])
b.lastRuneSize = -1
}

return
}

When buf byte slice contains data, it will search the target value inside it. But initially buf is empty, it need firstly load some data, right? That is the most interesting part. The b.fill() is just for that.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (b *Reader) fill() {
if b.r > 0 {
copy(b.buf, b.buf[b.r:b.w])
b.w -= b.r
b.r = 0
}

if b.w >= len(b.buf) {
panic("bufio: tried to fill full buffer")
}

// Read new data: try a limited number of times.
for i := maxConsecutiveEmptyReads; i > 0; i-- {
n, err := b.rd.Read(b.buf[b.w:]) // call the underlying Reader
if n < 0 {
panic(errNegativeRead)
}
b.w += n
if err != nil {
b.err = err
return
}
if n > 0 {
return
}
}
b.err = io.ErrNoProgress
}

The data is loaded into buf by calling the underlying Reader,

1
n, err := b.rd.Read(b.buf[b.w:])

in our case is os.Stdin.

Customized Reader

To have a better understand about the buffering IO technique, we can define our own customized Reader and pass it bufio.NewReader as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"bufio"
"fmt"
"io"
)

// customized Reader struct
type Reader struct {
counter int
}

func (r *Reader) Read(p []byte) (n int, err error) {
fmt.Println("Read")
if r.counter >= 3 { // simulate EOF
return 0, io.EOF
}
s := "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p"
copy(p, s)
r.counter += 1
return len(s), nil
}

func main() {
r := new(Reader)
br := bufio.NewReader(r)
for {
token, err := br.ReadSlice(',')
fmt.Printf("Token: %q\n", token)
if err == io.EOF {
fmt.Println("Read done")
break
}
}
}

Please run the demo code above, observe the output and think about why it generates such result.

Summary

In this post, I only talked about Reader part of bufio, if you understand the behavior explained above clearly, it’s easy to understand Writer quickly as well.