From d212809ed3fde3e23e29c1d5358b2d17d7de69ec Mon Sep 17 00:00:00 2001 From: david birdsong Date: Thu, 9 Jul 2015 09:26:20 -0700 Subject: [PATCH] enable the user to specify an optional encoder backwards compatible, if none specified still use Payload field --- README.md | 2 ++ bq_output.go | 14 ++++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f3f89ee..a7750c0 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ Private key for BigQuery is specified by a pkcs12 format PEM file that was conve Table names in BigQuery will {dataset_id}/{table_id}{date_stamp}. date_stamp formats as such: 20151230 +If no Encoder is specified in TOML, then the message Payload is extracted and sent. + ## Sample TOML file (with Kafka as input source): ``` diff --git a/bq_output.go b/bq_output.go index fb7cb45..a97237b 100644 --- a/bq_output.go +++ b/bq_output.go @@ -116,14 +116,24 @@ func (bqo *BqOutput) Run(or OutputRunner, h PluginHelper) (err error) { logError(or, "Initialize Table", err) } + encoder := or.Encoder() for ok { select { case pack, ok = <-inChan: if !ok { break } - payload = []byte(pack.Message.GetPayload()) - pack.Recycle() + if encoder != nil { + payload, err = or.Encode(pack) + if err != nil { + or.LogError(err) + pack.Recycle() + continue + } + } else { + payload = []byte(pack.Message.GetPayload()) + pack.Recycle() + } // Write to both file and buffer if _, err = f.Write(payload); err != nil {