Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
aranair committed Jul 1, 2015
1 parent e63e0cf commit cc588b9
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
7 changes: 6 additions & 1 deletion bq/bq.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
bigquery "google.golang.org/api/bigquery/v2"
)

// Holds all the information needed for BigQuery operations
type BqUploader struct {
bq *bigquery.Service
projectId string
datasetId string
}

// Initializes all the bigquery data needed
func NewBqUploader(pkey []byte, projectId string, datasetId string, serviceEmail string) *BqUploader {
conf := &jwt.Config{
Email: serviceEmail,
Expand Down Expand Up @@ -82,6 +84,7 @@ func (bu *BqUploader) InsertRows(tableId string, list []map[string]bigquery.Json
return err
}

// SendInsert
func (bu *BqUploader) SendInsert(tableId string, rows []*bigquery.TableDataInsertAllRequestRows) error {
req := &bigquery.TableDataInsertAllRequest{
Rows: rows,
Expand All @@ -98,7 +101,9 @@ func (bu *BqUploader) SendInsert(tableId string, rows []*bigquery.TableDataInser
return nil
}

// InsertRow inserts a new row into the desired project, dataset and table or returns an error
// InsertRow prepares and delegates to send a single row into a BigQuery Table.
// Arguments: projectID, datasetID and tableID
// Returns an error if problematic
func (bu *BqUploader) InsertRow(tableId string, rowData map[string]bigquery.JsonValue) error {
rows := []*bigquery.TableDataInsertAllRequestRows{
{
Expand Down
23 changes: 13 additions & 10 deletions bq_output.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright 2015 Boa Ho Man. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package hbq

import (
Expand All @@ -15,11 +19,11 @@ import (
. "github.com/mozilla-services/heka/pipeline"
)

const INTERVAL_PERIOD time.Duration = 24 * time.Hour
const HOUR_TO_TICK int = 00
const MINUTE_TO_TICK int = 00
const SECOND_TO_TICK int = 00
const MAX_BUFFER_SIZE = 1000
const IntervalPeriod time.Duration = 24 * time.Hour
const TickHour int = 00
const TickMinute int = 00
const TickSecond int = 00
const MaxBuffer = 1000

// Id is actually the datasetId
type BqOutputConfig struct {
Expand Down Expand Up @@ -47,11 +51,10 @@ func (bqo *BqOutput) ConfigStruct() interface{} {
func (bqo *BqOutput) Init(config interface{}) (err error) {
bqo.config = config.(*BqOutputConfig)

serviceEmail, _ := ioutil.ReadFile(bqo.config.ServiceEmail)
pkey, _ := ioutil.ReadFile(bqo.config.PemFilePath)
schema, _ := ioutil.ReadFile(bqo.config.SchemaFilePath)

bu := bq.NewBqUploader(pkey, bqo.config.ProjectId, bqo.config.DatasetId)
bu := bq.NewBqUploader(pkey, bqo.config.ProjectId, bqo.config.DatasetId, bqo.config.ServiceEmail)

bqo.schema = schema
bqo.bu = bu
Expand Down Expand Up @@ -103,7 +106,7 @@ func (bqo *BqOutput) Run(or OutputRunner, h PluginHelper) (err error) {
}

// Upload Stuff (1mb)
if buf.Len() > MAX_BUFFER_SIZE {
if buf.Len() > MaxBuffer {
f.Close() // Close file for uploading
bqo.UploadAndReset(buf, fp, oldDay, or)
f, _ = os.OpenFile(fp, fileOp, 0666)
Expand Down Expand Up @@ -213,10 +216,10 @@ func mkDirectories(path string) {

func midnightTickerUpdate() *time.Ticker {
nextTick := time.Date(time.Now().Year(), time.Now().Month(),
time.Now().Day(), HOUR_TO_TICK, MINUTE_TO_TICK, SECOND_TO_TICK,
time.Now().Day(), TickHour, TickMinute, TickSecond,
0, time.Local)
if !nextTick.After(time.Now()) {
nextTick = nextTick.Add(INTERVAL_PERIOD)
nextTick = nextTick.Add(IntervalPeriod)
}
diff := nextTick.Sub(time.Now())
return time.NewTicker(diff)
Expand Down

0 comments on commit cc588b9

Please sign in to comment.