Skip to content

Commit

Permalink
enable the user to specify an optional encoder
Browse files Browse the repository at this point in the history
backwards compatible, if none specified still use Payload field
  • Loading branch information
davidbirdsong committed Jul 9, 2015
1 parent 0a27be8 commit d212809
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Private key for BigQuery is specified by a pkcs12 format PEM file that was conve

Table names in BigQuery will {dataset_id}/{table_id}{date_stamp}. date_stamp formats as such: 20151230

If no Encoder is specified in TOML, then the message Payload is extracted and sent.

## Sample TOML file (with Kafka as input source):

```
Expand Down
14 changes: 12 additions & 2 deletions bq_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,24 @@ func (bqo *BqOutput) Run(or OutputRunner, h PluginHelper) (err error) {
logError(or, "Initialize Table", err)
}

encoder := or.Encoder()
for ok {
select {
case pack, ok = <-inChan:
if !ok {
break
}
payload = []byte(pack.Message.GetPayload())
pack.Recycle()
if encoder != nil {
payload, err = or.Encode(pack)
if err != nil {
or.LogError(err)
pack.Recycle()
continue
}
} else {
payload = []byte(pack.Message.GetPayload())
pack.Recycle()
}

// Write to both file and buffer
if _, err = f.Write(payload); err != nil {
Expand Down

0 comments on commit d212809

Please sign in to comment.