diff --git a/README.md b/README.md index f3f89ee..a7750c0 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ Private key for BigQuery is specified by a pkcs12 format PEM file that was conve Table names in BigQuery will {dataset_id}/{table_id}{date_stamp}. date_stamp formats as such: 20151230 +If no Encoder is specified in TOML, then the message Payload is extracted and sent. + ## Sample TOML file (with Kafka as input source): ``` diff --git a/bq_output.go b/bq_output.go index fb7cb45..a97237b 100644 --- a/bq_output.go +++ b/bq_output.go @@ -116,14 +116,24 @@ func (bqo *BqOutput) Run(or OutputRunner, h PluginHelper) (err error) { logError(or, "Initialize Table", err) } + encoder := or.Encoder() for ok { select { case pack, ok = <-inChan: if !ok { break } - payload = []byte(pack.Message.GetPayload()) - pack.Recycle() + if encoder != nil { + payload, err = or.Encode(pack) + if err != nil { + or.LogError(err) + pack.Recycle() + continue + } + } else { + payload = []byte(pack.Message.GetPayload()) + pack.Recycle() + } // Write to both file and buffer if _, err = f.Write(payload); err != nil {