diff --git a/bq/bq.go b/bq/bq.go index 0427f9d..6336151 100644 --- a/bq/bq.go +++ b/bq/bq.go @@ -13,12 +13,14 @@ import ( bigquery "google.golang.org/api/bigquery/v2" ) +// Holds all the information needed for BigQuery operations type BqUploader struct { bq *bigquery.Service projectId string datasetId string } +// Initializes all the bigquery data needed func NewBqUploader(pkey []byte, projectId string, datasetId string, serviceEmail string) *BqUploader { conf := &jwt.Config{ Email: serviceEmail, @@ -82,6 +84,7 @@ func (bu *BqUploader) InsertRows(tableId string, list []map[string]bigquery.Json return err } +// SendInsert func (bu *BqUploader) SendInsert(tableId string, rows []*bigquery.TableDataInsertAllRequestRows) error { req := &bigquery.TableDataInsertAllRequest{ Rows: rows, @@ -98,7 +101,9 @@ func (bu *BqUploader) SendInsert(tableId string, rows []*bigquery.TableDataInser return nil } -// InsertRow inserts a new row into the desired project, dataset and table or returns an error +// InsertRow prepares and delegates to send a single row into a BigQuery Table. +// Arguments: projectID, datasetID and tableID +// Returns an error if problematic func (bu *BqUploader) InsertRow(tableId string, rowData map[string]bigquery.JsonValue) error { rows := []*bigquery.TableDataInsertAllRequestRows{ { diff --git a/bq_output.go b/bq_output.go index a4f6b85..8016bcd 100644 --- a/bq_output.go +++ b/bq_output.go @@ -1,3 +1,7 @@ +// Copyright 2015 Boa Ho Man. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package hbq import ( @@ -15,11 +19,11 @@ import ( . "github.com/mozilla-services/heka/pipeline" ) -const INTERVAL_PERIOD time.Duration = 24 * time.Hour -const HOUR_TO_TICK int = 00 -const MINUTE_TO_TICK int = 00 -const SECOND_TO_TICK int = 00 -const MAX_BUFFER_SIZE = 1000 +const IntervalPeriod time.Duration = 24 * time.Hour +const TickHour int = 00 +const TickMinute int = 00 +const TickSecond int = 00 +const MaxBuffer = 1000 // Id is actually the datasetId type BqOutputConfig struct { @@ -47,11 +51,10 @@ func (bqo *BqOutput) ConfigStruct() interface{} { func (bqo *BqOutput) Init(config interface{}) (err error) { bqo.config = config.(*BqOutputConfig) - serviceEmail, _ := ioutil.ReadFile(bqo.config.ServiceEmail) pkey, _ := ioutil.ReadFile(bqo.config.PemFilePath) schema, _ := ioutil.ReadFile(bqo.config.SchemaFilePath) - bu := bq.NewBqUploader(pkey, bqo.config.ProjectId, bqo.config.DatasetId) + bu := bq.NewBqUploader(pkey, bqo.config.ProjectId, bqo.config.DatasetId, bqo.config.ServiceEmail) bqo.schema = schema bqo.bu = bu @@ -103,7 +106,7 @@ func (bqo *BqOutput) Run(or OutputRunner, h PluginHelper) (err error) { } // Upload Stuff (1mb) - if buf.Len() > MAX_BUFFER_SIZE { + if buf.Len() > MaxBuffer { f.Close() // Close file for uploading bqo.UploadAndReset(buf, fp, oldDay, or) f, _ = os.OpenFile(fp, fileOp, 0666) @@ -213,10 +216,10 @@ func mkDirectories(path string) { func midnightTickerUpdate() *time.Ticker { nextTick := time.Date(time.Now().Year(), time.Now().Month(), - time.Now().Day(), HOUR_TO_TICK, MINUTE_TO_TICK, SECOND_TO_TICK, + time.Now().Day(), TickHour, TickMinute, TickSecond, 0, time.Local) if !nextTick.After(time.Now()) { - nextTick = nextTick.Add(INTERVAL_PERIOD) + nextTick = nextTick.Add(IntervalPeriod) } diff := nextTick.Sub(time.Now()) return time.NewTicker(diff)