Skip to content

Commit

Permalink
Initial release
Browse files Browse the repository at this point in the history
  • Loading branch information
gzuidhof committed Nov 20, 2024
1 parent f0b1b53 commit ade1688
Show file tree
Hide file tree
Showing 12 changed files with 1,247 additions and 2 deletions.
25 changes: 25 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: "Test"

on:
pull_request:
push:

jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
go-version:
- 1.18.x
- 1.23.x
steps:
- name: checkout
uses: actions/checkout@v4
- name: install
uses: actions/setup-go@v5
with:
go-version: '${{ matrix.go-version }}'
- name: vet
run: go vet ./...
- name: test
run: go test -v -race ./...
155 changes: 153 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,153 @@
# batchman
In-memory batcher that calls a function with a batch of items when a certain size or timeout is reached.
# 🦇 batchman

`batchman` provides an in-memory batcher that calls a function with a batch of items when the batch is full or a timeout is reached.

It is useful for batching requests to a service to reduce the total number of calls made.

For example you may only want to send events to your analytics service in batches of 1000 items or after 10 seconds, whichever comes first.

Batchman provides the controls you need to implement graceful shutdown where you don't lose any data.

## Features

* Supports batching up to a maximum number of items or a maximum time duration.
* Strongly typed with generics.
* Thread-safe.
* Context-aware.
* Buffered to avoid blocking the caller, with a configurable buffer size.
* Support for graceful shutdown without losing any items.

## Usage

You define a function that will be called with a batch of items. It will never be called with an empty slice.
The function will be called in a non-overlapping manner, i.e. the next call will only be made after the previous one has returned.

```go
func Flush(ctx context.Context, items []MyItemType) {
// Handle the items.
}
```

You then use the builder pattern with `batchman.New[MyItemType]()` and configure it with `MaxSize`, `MaxDelay` and `BufferSize`.

```go
init := batchman.New[MyItemType]().MaxSize(2_000).MaxDelay(10 * time.Second)
```

Finally, you start the batcher with `Start` and push items to it with `Push`.

```go
batcher, err := init.Start(ctx, Flush)
if err != nil {
panic(err)
}

err = batcher.Push(MyItemType{Some: "value"})
if err != nil {
panic(err)
}
```

When you are done (or shutting down your program), cancel the context passed to `Start` to stop the batcher. The `Done` channel will be closed when the batcher has finished flushing the remaining data.

```go
cancel()

<-batcher.Done()
```


### Full example

```go
package main

import (
"context"
"fmt"
"time"

"github.com/friendlycaptcha/batchman"
)

// MyItemType is a type that will be batched.
type MyItemType struct {
ID int
Name string
}

func main() {
ctx, cancel := context.WithCancel(context.Background())

// Define a function that will be called with a batch of items.
flush := func(_ context.Context, items []MyItemType) {
fmt.Println("Flushing a batch of", len(items), "items")
time.Sleep(1 * time.Second)
}

init := batchman.New[MyItemType]().MaxSize(2000).MaxDelay(10 * time.Second)

// Start the batcher, it will only error immediately or not at all. This is a non-blocking call.
batcher, err := init.Start(ctx, flush)
if err != nil {
panic(err)
}

for i := 0; i < 5000; i++ {
// Add items to the batcher, this is a non-blocking call.
err = batcher.Push(MyItemType{ID: i, Name: "Some Name"})
if err != nil {
// This errors if the batcher has been stopped (by cancelling the context), or if the
// buffer is full and the batcher is unable to accept more items.
panic(err)
}
}

cancel()

// Wait for the batcher to finish flushing.
<-batcher.Done()
}
```

For more examples, see the [example](example) directory.

# Tips

## Graceful Shutdown

To gracefully shutdown the batcher, cancel the context passed to `Start`. This will cause the batcher to flush any remaining items and return. The `Done` channel will be closed when the batcher has finished flushing.

The context that is passed to the flush function is cancelled when the batcher is stopped, you can use this to stop any long-running flush operations. Note that any remaining data in the buffer will have the flush function called with a cancelled context, so you should decide how to handle this in your implementation.

You may want to use `context.WithoutCancel` to create a new context that is not cancelled when the batcher is stopped. This can be useful if you want to give the last remaining batches a chance to finish when shutting down your program.

```go
func MyFlushFunc(ctx context.Context, items []MyItemType) {
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5 * time.Second)
defer cancel()

// Handle the items.
}
```

Alternatively you can also use a `select { case <- ctx.Done() ... }` to check if the context has since been cancelled.

## Timeout on shutdown

While unlikely, it is possible that the buffer is backed up and there are multiple batches that need to be flushed on shutdown.

You may want to wait a maximum time for `batcher.Done()` to close.

```go
select {
case <-batcher.Done():
// Done flushing all items.
case <-time.After(20 * time.Second):
// Timed out waiting for the batcher to finish flushing.
}
```

## License

[MIT](LICENSE)
140 changes: 140 additions & 0 deletions batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Package batchman provides an in-memory batching mechanism for items of a given type.
package batchman

import (
"context"
"sync/atomic"
"time"
)

// Batcher is a controller that batches items of a given type into batches with a maximum size or after a maximum delay.
type Batcher[T any] struct {
buffer chan T
stopped chan struct{}

isStopped atomic.Bool
}

// Done returns a channel that is closed when the batcher has stopped completely.
// Once a batcher has stopped, no more items can be pushed, and it can not be started again.
//
// The batcher stops when the parent context is cancelled, but it will flush the remaining items in the buffer.
// This means that the stopped channel is closed after the last item has been flushed. Depending on the implementation
// of the flush function, this might take some time.
func (b *Batcher[T]) Done() <-chan struct{} {
return b.stopped
}

//nolint:gocognit // Splitting it up would make it harder to read.
func (b *Batcher[T]) start(ctx context.Context,
maxDelay time.Duration,
maxSize int,
flush func(ctx context.Context, items []T),
) {
timerIsRunning := false
isCancelled := false

var items []T
timer := time.NewTimer(0)
<-timer.C // See https://github.com/golang/go/issues/12721 why this is necessary.

for {
var (
isMaxDelay bool
isMaxSize bool
)

if !isCancelled {
select {
case <-ctx.Done():
isCancelled = true
b.isStopped.Store(true)

// We can cancel the timer here, because we are not going to use it anymore.
timer.Stop()
case <-timer.C:
isMaxDelay = true
timerIsRunning = false
case item := <-b.buffer:
items = append(items, item)
isMaxSize = len(items) >= maxSize
}
} else {
// When the context is cancelled, we need to flush the remaining items if any are present in the buffer
select {
case item := <-b.buffer:
items = append(items, item)
isMaxSize = len(items) >= maxSize
default:
// If no items in buffer carry on.
// Possibly this default case is not needed - but better safe than sorry.
}
}

shouldFlush := isCancelled || isMaxDelay || isMaxSize

if !shouldFlush {
// After the first item is added, start the timer.
if len(items) == 1 {
timer.Reset(maxDelay)
timerIsRunning = true
}
continue
}

// If the batcher is cancelled and the buffer is not empty, we want to flush the
// remaining items with the maximum batch size, so we skip until we reach max size or the buffer is empty.
skipFlush := isCancelled && len(b.buffer) > 0 && !isMaxSize

if !skipFlush {
// We need to copy the slice to make sure that the slice that is passed is valid even if asynchronously
// accessed in a routine spawned by the flush function.
flushItems := make([]T, len(items))
copy(flushItems, items)

flush(ctx, flushItems)
items = items[:0]

if !isCancelled {
if timerIsRunning && !timer.Stop() {
<-timer.C
}
}
}

if isCancelled {
// If the buffer is not empty, we continue flushing.
// This loop will run again and we'll read the next item from the buffer.
if len(b.buffer) > 0 {
continue
}

close(b.stopped)
return
}
}
}

// Push an item to the batcher. If the buffer is full, an error is returned.
func (b *Batcher[T]) Push(item T) error {
if b.isStopped.Load() {
return ErrBatcherStopped
}

select {
case b.buffer <- item:
return nil
default:
return ErrBufferFull
}
}

// CurrentBufferSize returns the current amount of items in the buffer.
//
// Note that the buffer is not the amount of items pending to be flushed, it doesn't include items currently being
// flushed or being grouped into the next batch.
//
// You can use this to monitor the buffer size, when the buffer fills up you won't be able to push additional items.
func (b *Batcher[T]) CurrentBufferSize() int {
return len(b.buffer)
}
Loading

0 comments on commit ade1688

Please sign in to comment.