From 9f6788b8dd58f53f18ff206952aa4865724236a2 Mon Sep 17 00:00:00 2001 From: Valentin Kuznetsov Date: Mon, 7 Dec 2020 08:12:48 -0500 Subject: [PATCH] Convert code to streaming interface for all APIs; add benchmark testing --- Makefile | 4 +- dbs/acquisitioneras.go | 8 +- dbs/blocks.go | 36 ++++---- dbs/datasets.go | 83 +++-------------- dbs/datatypes.go | 8 +- dbs/dbs.go | 203 ++++++++--------------------------------- dbs/files.go | 32 ++++--- dbs/outputconfigs.go | 8 +- dbs/physicsgroups.go | 8 +- dbs/primarydatasets.go | 8 +- dbs/primarydstypes.go | 8 +- dbs/processingeras.go | 8 +- dbs/releaseversions.go | 8 +- dbs/runs.go | 18 ++-- dbs/tiers.go | 31 +------ test/utils_test.go | 23 +++++ utils/utils.go | 11 +++ web/handlers.go | 60 +++--------- 18 files changed, 188 insertions(+), 377 deletions(-) diff --git a/Makefile b/Makefile index 9fc08682..85997800 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,9 @@ install: clean: go clean; rm -rf pkg -test : test1 +test : test1 bench test1: cd test; go test -v . +bench: + cd test; go test -bench=. diff --git a/dbs/acquisitioneras.go b/dbs/acquisitioneras.go index b8d77d19..685342d1 100644 --- a/dbs/acquisitioneras.go +++ b/dbs/acquisitioneras.go @@ -1,11 +1,13 @@ package dbs import ( + "errors" "fmt" + "net/http" ) // acquisitioneras API -func (API) AcquisitionEras(params Record) []Record { +func (API) AcquisitionEras(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -14,7 +16,7 @@ func (API) AcquisitionEras(params Record) []Record { acquisitioneras := getValues(params, "data_tier_name") if len(acquisitioneras) > 1 { msg := "The acquisitioneras API does not support list of acquisitioneras" - return errorRecord(msg) + return errors.New(msg) } else if len(acquisitioneras) == 1 { op, val := opVal(acquisitioneras[0]) cond := fmt.Sprintf(" AE.ACQUISITION_ERA_NAME %s %s", op, placeholder("acquisition_era_name")) @@ -26,5 +28,5 @@ func (API) AcquisitionEras(params Record) []Record { // get SQL statement from static area stm := getSQL("acquisitioneras") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } diff --git a/dbs/blocks.go b/dbs/blocks.go index 1a21ac03..3157214b 100644 --- a/dbs/blocks.go +++ b/dbs/blocks.go @@ -1,12 +1,14 @@ package dbs import ( + "errors" "fmt" + "net/http" "strings" ) // blocks API -func (API) Blocks(params Record) []Record { +func (API) Blocks(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -15,7 +17,7 @@ func (API) Blocks(params Record) []Record { blocks := getValues(params, "block_name") if len(blocks) > 1 { msg := "Unsupported list of blocks" - return errorRecord(msg) + return errors.New(msg) } else if len(blocks) == 1 { op, val := opVal(blocks[0]) cond := fmt.Sprintf(" B.BLOCK_NAME %s %s", op, placeholder("block_name")) @@ -25,7 +27,7 @@ func (API) Blocks(params Record) []Record { datasets := getValues(params, "dataset") if len(datasets) > 1 { msg := "The files API does not support list of datasets" - return errorRecord(msg) + return errors.New(msg) } else if len(datasets) == 1 { op, val := opVal(datasets[0]) cond := fmt.Sprintf(" DS.DATASET %s %s", op, placeholder("dataset")) @@ -35,11 +37,11 @@ func (API) Blocks(params Record) []Record { // get SQL statement from static area stm := getSQL("blocks") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } // blockparent API -func (API) BlockParent(params Record) []Record { +func (API) BlockParent(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -48,7 +50,7 @@ func (API) BlockParent(params Record) []Record { blockparent := getValues(params, "block_name") if len(blockparent) > 1 { msg := "Unsupported list of blockparent" - return errorRecord(msg) + return errors.New(msg) } else if len(blockparent) == 1 { op, val := opVal(blockparent[0]) cond := fmt.Sprintf(" BP.BLOCK_NAME %s %s", op, placeholder("block_name")) @@ -58,11 +60,11 @@ func (API) BlockParent(params Record) []Record { // get SQL statement from static area stm := getSQL("blockparent") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } // blockchildren API -func (API) BlockChildren(params Record) []Record { +func (API) BlockChildren(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -71,7 +73,7 @@ func (API) BlockChildren(params Record) []Record { blockchildren := getValues(params, "block_name") if len(blockchildren) > 1 { msg := "Unsupported list of blockchildren" - return errorRecord(msg) + return errors.New(msg) } else if len(blockchildren) == 1 { op, val := opVal(blockchildren[0]) cond := fmt.Sprintf(" BP.BLOCK_NAME %s %s", op, placeholder("block_name")) @@ -81,11 +83,11 @@ func (API) BlockChildren(params Record) []Record { // get SQL statement from static area stm := getSQL("blockchildren") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } // blocksummaries API -func (API) BlockSummaries(params Record) []Record { +func (API) BlockSummaries(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var stm, where_clause string var args []interface{} @@ -113,7 +115,7 @@ func (API) BlockSummaries(params Record) []Record { dataset := getValues(params, "dataset") if len(dataset) > 1 { msg := "Unsupported list of dataset" - return errorRecord(msg) + return errors.New(msg) } else if len(dataset) == 1 { _, val := opVal(dataset[0]) args = append(args, val) @@ -124,11 +126,11 @@ func (API) BlockSummaries(params Record) []Record { stm = strings.Replace(stm, "where_clause", where_clause, -1) } // use generic query API to fetch the results from DB - return executeAll(genSQL+stm, args...) + return executeAll(w, genSQL+stm, args...) } // blockorigin API -func (API) BlockOrigin(params Record) []Record { +func (API) BlockOrigin(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -137,7 +139,7 @@ func (API) BlockOrigin(params Record) []Record { block := getValues(params, "block_name") if len(block) > 1 { msg := "Unsupported list of block" - return errorRecord(msg) + return errors.New(msg) } else if len(block) == 1 { op, val := opVal(block[0]) cond := fmt.Sprintf(" B.BLOCK_NAME %s %s", op, placeholder("block_name")) @@ -147,7 +149,7 @@ func (API) BlockOrigin(params Record) []Record { dataset := getValues(params, "dataset") if len(dataset) > 1 { msg := "Unsupported list of dataset" - return errorRecord(msg) + return errors.New(msg) } else if len(dataset) == 1 { op, val := opVal(dataset[0]) cond := fmt.Sprintf(" DS.DATASET %s %s", op, placeholder("dataset")) @@ -157,5 +159,5 @@ func (API) BlockOrigin(params Record) []Record { // get SQL statement from static area stm := getSQL("blockorigin") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } diff --git a/dbs/datasets.go b/dbs/datasets.go index c66790ec..ccddda3a 100644 --- a/dbs/datasets.go +++ b/dbs/datasets.go @@ -2,13 +2,14 @@ package dbs import ( "database/sql" + "errors" "fmt" "net/http" "strings" ) // Datasets API -func (API) Datasets(params Record) []Record { +func (API) Datasets(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -60,11 +61,11 @@ func (API) Datasets(params Record) []Record { vals = []interface{}{new(sql.NullString)} } // use generic query API to fetch the results from DB - return execute(genSQL+stm+where, cols, vals, args...) + return execute(w, genSQL+stm+where, cols, vals, args...) } // datasetparent API -func (API) DatasetParent(params Record) []Record { +func (API) DatasetParent(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -73,7 +74,7 @@ func (API) DatasetParent(params Record) []Record { datasetparent := getValues(params, "dataset") if len(datasetparent) > 1 { msg := "The datasetparent API does not support list of datasetparent" - return errorRecord(msg) + return errors.New(msg) } else if len(datasetparent) == 1 { op, val := opVal(datasetparent[0]) cond := fmt.Sprintf(" D.DATASET %s %s", op, placeholder("dataset")) @@ -81,16 +82,16 @@ func (API) DatasetParent(params Record) []Record { args = append(args, val) } else { msg := fmt.Sprintf("No arguments for datasetparent API") - return errorRecord(msg) + return errors.New(msg) } // get SQL statement from static area stm := getSQL("datasetparent") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } // datasetchildren API -func (API) DatasetChildren(params Record) []Record { +func (API) DatasetChildren(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -99,7 +100,7 @@ func (API) DatasetChildren(params Record) []Record { datasetchildren := getValues(params, "dataset") if len(datasetchildren) > 1 { msg := "The datasetchildren API does not support list of datasetchildren" - return errorRecord(msg) + return errors.New(msg) } else if len(datasetchildren) == 1 { op, val := opVal(datasetchildren[0]) cond := fmt.Sprintf(" D.DATASET %s %s", op, placeholder("dataset")) @@ -107,16 +108,16 @@ func (API) DatasetChildren(params Record) []Record { args = append(args, val) } else { msg := fmt.Sprintf("No arguments for datasetchildren API") - return errorRecord(msg) + return errors.New(msg) } // get SQL statement from static area stm := getSQL("datasetchildren") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } // datasetaccesstypes API -func (API) DatasetAccessTypes(params Record) []Record { +func (API) DatasetAccessTypes(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := " WHERE " @@ -125,7 +126,7 @@ func (API) DatasetAccessTypes(params Record) []Record { datasetaccesstypes := getValues(params, "dataset_access_type") if len(datasetaccesstypes) > 1 { msg := "The datasetaccesstypes API does not support list of datasetaccesstypes" - return errorRecord(msg) + return errors.New(msg) } else if len(datasetaccesstypes) == 1 { op, val := opVal(datasetaccesstypes[0]) cond := fmt.Sprintf(" DT.DATASET_ACCESS_TYPE %s %s", op, placeholder("dataset_access_type")) @@ -137,61 +138,5 @@ func (API) DatasetAccessTypes(params Record) []Record { // get SQL statement from static area stm := getSQL("datasetaccesstypes") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) -} - -// Datasets API -func (API) DatasetsNew(params Record, w http.ResponseWriter) error { - // variables we'll use in where clause - var args []interface{} - where := "WHERE " - - // parse detail arugment - detail := getSingleValue(params, "detail") - - // parse is_dataset_valid argument - isValid := getSingleValue(params, "is_dataset_valid") - if isValid == "" { - isValid = "1" - } - where += fmt.Sprintf("D.IS_DATASET_VALID = %s", placeholder("is_dataset_valid")) - args = append(args, isValid) - - // parse dataset_id argument - dataset_access_type := getSingleValue(params, "dataset_access_type") - if dataset_access_type == "" { - dataset_access_type = "VALID" - } - where += fmt.Sprintf(" AND DP.DATASET_ACCESS_TYPE = %s", placeholder("dataset_access_type")) - args = append(args, dataset_access_type) - - // parse dataset argument - datasets := getValues(params, "dataset") - genSQL := "" - if len(datasets) > 1 { - where += fmt.Sprintf(" AND D.DATASET in (SELECT TOKEN FROM TOKEN_GENERATOR)") - where += fmt.Sprintf(" AND DP.DATASET_ACCESS_TYPE = %s", placeholder("dataset_access_type")) - var vals []string - genSQL, vals = tokens(datasets) - for _, d := range vals { - args = append(args, d, d, d) // append three values since tokens generates placeholders for them - } - } else if len(datasets) == 1 { - op, val := opVal(datasets[0]) - where += fmt.Sprintf(" AND D.DATASET %s %s", op, placeholder("dataset")) - where += fmt.Sprintf(" AND DP.DATASET_ACCESS_TYPE = %s", placeholder("dataset_access_type")) - args = append(args, val) - } - - // get SQL statement from static area - stm := getSQL("datasets") - cols := []string{"dataset_id", "dataset", "prep_id", "xtcrosssection", "creation_date", "create_by", "last_modification_date", "last_modified_by", "primary_ds_name", "primary_ds_type", "processed_ds_name", "data_tier_name", "dataset_access_type", "acquisition_era_name", "processing_version", "physics_group_name"} - vals := []interface{}{new(sql.NullInt64), new(sql.NullString), new(sql.NullString), new(sql.NullFloat64), new(sql.NullInt64), new(sql.NullString), new(sql.NullInt64), new(sql.NullString), new(sql.NullString), new(sql.NullString), new(sql.NullString), new(sql.NullString), new(sql.NullString), new(sql.NullString), new(sql.NullInt64), new(sql.NullString)} - if strings.ToLower(detail) != "true" { - stm = getSQL("datasets_short") - cols = []string{"dataset"} - vals = []interface{}{new(sql.NullString)} - } - // use generic query API to fetch the results from DB - return executeNew(w, genSQL+stm+where, cols, vals, args...) + return executeAll(w, stm+where, args...) } diff --git a/dbs/datatypes.go b/dbs/datatypes.go index 488ea0ed..9f6563f8 100644 --- a/dbs/datatypes.go +++ b/dbs/datatypes.go @@ -1,11 +1,13 @@ package dbs import ( + "errors" "fmt" + "net/http" ) // datatypes API -func (API) DataTypes(params Record) []Record { +func (API) DataTypes(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -14,7 +16,7 @@ func (API) DataTypes(params Record) []Record { datatypes := getValues(params, "datatype") if len(datatypes) > 1 { msg := "The datatypes API does not support list of datatypes" - return errorRecord(msg) + return errors.New(msg) } else if len(datatypes) == 1 { op, val := opVal(datatypes[0]) cond := fmt.Sprintf(" DT.datatype %s %s", op, placeholder("datatype")) @@ -26,5 +28,5 @@ func (API) DataTypes(params Record) []Record { // get SQL statement from static area stm := getSQL("datatypes") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } diff --git a/dbs/dbs.go b/dbs/dbs.go index 27bfa603..adf24c92 100644 --- a/dbs/dbs.go +++ b/dbs/dbs.go @@ -103,106 +103,39 @@ func placeholder(pholder string) string { } } -func executeAllNew(w http.ResponseWriter, stm string, args ...interface{}) error { - enc := json.NewEncoder(w) - - if utils.VERBOSE > 1 { - log.Println(stm, args) - } - tx, err := DB.Begin() - if err != nil { - return err - } - defer tx.Rollback() - rows, err := tx.Query(stm, args...) - if err != nil { - return err - } - defer rows.Close() - - // extract columns from Rows object and create values & valuesPtrs to retrieve results - columns, _ := rows.Columns() - var cols []string - count := len(columns) - values := make([]interface{}, count) - valuePtrs := make([]interface{}, count) - rowCount := 0 - - for rows.Next() { - if rowCount == 0 { - // initialize value pointers - for i, _ := range columns { - valuePtrs[i] = &values[i] - } - } - err := rows.Scan(valuePtrs...) - if err != nil { - return err - } - rowCount += 1 - // store results into generic record (a dict) - rec := make(Record) - for i, col := range columns { - if len(cols) != len(columns) { - cols = append(cols, strings.ToLower(col)) - } - vvv := values[i] - switch val := vvv.(type) { - case *sql.NullString: - v, e := val.Value() - if e == nil { - rec[cols[i]] = v - } - case *sql.NullInt64: - v, e := val.Value() - if e == nil { - rec[cols[i]] = v - } - case *sql.NullFloat64: - v, e := val.Value() - if e == nil { - rec[cols[i]] = v - } - case *sql.NullBool: - v, e := val.Value() - if e == nil { - rec[cols[i]] = v - } - default: - rec[cols[i]] = val - } - } - err = enc.Encode(rec) - if err != nil { - return err - } - } - if err = rows.Err(); err != nil { - return err - } - return nil +// helper function to generate error record +func errorRecord(msg string) []Record { + var out []Record + erec := make(Record) + erec["error"] = msg + out = append(out, erec) + return out } // generic API to execute given statement // ideas are taken from // http://stackoverflow.com/questions/17845619/how-to-call-the-scan-variadic-function-in-golang-using-reflection -func executeAll(stm string, args ...interface{}) []Record { - var out []Record +// here we use http response writer in order to make encoder +// then we literally stream data with our encoder (i.e. write records +// to writer) +func executeAll(w http.ResponseWriter, stm string, args ...interface{}) error { + enc := json.NewEncoder(w) + enc.Encode("[") + defer enc.Encode("]") if utils.VERBOSE > 1 { log.Println(stm, args) } tx, err := DB.Begin() if err != nil { - msg := fmt.Sprintf("fail to obtain transaction, %s", err) - return errorRecord(msg) + msg := fmt.Sprintf("unable to get DB transaction %v", err) + return errors.New(msg) } defer tx.Rollback() - // rows, err := DB.Query(stm, args...) rows, err := tx.Query(stm, args...) if err != nil { - msg := fmt.Sprintf("DB.Query, query='%s' args='%v' error=%v", stm, args, err) - return errorRecord(msg) + msg := fmt.Sprintf("unable to query statement=%v error=%v", stm, err) + return errors.New(msg) } defer rows.Close() @@ -213,7 +146,6 @@ func executeAll(stm string, args ...interface{}) []Record { values := make([]interface{}, count) valuePtrs := make([]interface{}, count) rowCount := 0 - for rows.Next() { if rowCount == 0 { // initialize value pointers @@ -223,8 +155,11 @@ func executeAll(stm string, args ...interface{}) []Record { } err := rows.Scan(valuePtrs...) if err != nil { - msg := fmt.Sprintf("rows.Scan, dest='%v', error=%v", valuePtrs, err) - return errorRecord(msg) + msg := fmt.Sprintf("unabelt to scan DB results %s", err) + return errors.New(msg) + } + if rowCount != 0 { + enc.Encode(",") } rowCount += 1 // store results into generic record (a dict) @@ -256,97 +191,26 @@ func executeAll(stm string, args ...interface{}) []Record { rec[cols[i]] = v } default: - // fmt.Printf("SQL result: %v (%T) %v (%T)\n", vvv, vvv, val, val) rec[cols[i]] = val } - // rec[cols[i]] = values[i] } - out = append(out, rec) - } - if err = rows.Err(); err != nil { - return errorRecord(fmt.Sprintf("unable to scan rows %v", err)) - } - return out -} - -// similar to executeAll function but it takes explicit set of columns and values -func execute(stm string, cols []string, vals []interface{}, args ...interface{}) []Record { - var out []Record - - if utils.VERBOSE > 1 { - log.Println(stm, args) - } - tx, err := DB.Begin() - if err != nil { - msg := fmt.Sprintf("unable to obtain transaction %v", err) - errorRecord(msg) - } - defer tx.Rollback() - // rows, err := DB.Query(stm, args...) - rows, err := tx.Query(stm, args...) - if err != nil { - msg := fmt.Sprintf("DB.Query, query='%s' args='%v' error=%v", stm, args, err) - return errorRecord(msg) - } - defer rows.Close() - - // loop over rows - for rows.Next() { - err := rows.Scan(vals...) + err = enc.Encode(rec) if err != nil { - msg := fmt.Sprintf("rows.Scan, vals='%v', error=%v", vals, err) - return errorRecord(msg) - } - rec := make(Record) - for i, _ := range cols { - vvv := vals[i] - switch val := vvv.(type) { - case *sql.NullString: - v, e := val.Value() - if e == nil { - rec[cols[i]] = v - } - case *sql.NullInt64: - v, e := val.Value() - if e == nil { - rec[cols[i]] = v - } - case *sql.NullFloat64: - v, e := val.Value() - if e == nil { - rec[cols[i]] = v - } - case *sql.NullBool: - v, e := val.Value() - if e == nil { - rec[cols[i]] = v - } - default: - // fmt.Printf("SQL result: %v (%T) %v (%T)\n", vvv, vvv, val, val) - rec[cols[i]] = val - } - // rec[cols[i]] = vals[i] + return err } - out = append(out, rec) } if err = rows.Err(); err != nil { - return errorRecord(fmt.Sprintf("unable to scan rows: %v", err)) + msg := fmt.Sprintf("rows error %v", err) + return errors.New(msg) } - return out -} - -// helper function to generate error record -func errorRecord(msg string) []Record { - var out []Record - erec := make(Record) - erec["error"] = msg - out = append(out, erec) - return out + return nil } // similar to executeAll function but it takes explicit set of columns and values -func executeNew(w http.ResponseWriter, stm string, cols []string, vals []interface{}, args ...interface{}) error { +func execute(w http.ResponseWriter, stm string, cols []string, vals []interface{}, args ...interface{}) error { enc := json.NewEncoder(w) + enc.Encode("[") + defer enc.Encode("]") if utils.VERBOSE > 1 { log.Println(stm, args) @@ -366,12 +230,17 @@ func executeNew(w http.ResponseWriter, stm string, cols []string, vals []interfa defer rows.Close() // loop over rows + rowCount := 0 for rows.Next() { err := rows.Scan(vals...) if err != nil { msg := fmt.Sprintf("rows.Scan, vals='%v', error=%v", vals, err) return errors.New(msg) } + if rowCount != 0 { + enc.Encode(",") + } + rowCount += 1 rec := make(Record) for i, _ := range cols { vvv := vals[i] diff --git a/dbs/files.go b/dbs/files.go index 19752eab..e791b231 100644 --- a/dbs/files.go +++ b/dbs/files.go @@ -1,12 +1,14 @@ package dbs import ( + "errors" "fmt" + "net/http" "strings" ) // files API -func (API) Files(params Record) []Record { +func (API) Files(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -15,7 +17,7 @@ func (API) Files(params Record) []Record { files := getValues(params, "logical_file_name") if len(files) > 1 { msg := "The files API does not support list of files" - return errorRecord(msg) + return errors.New(msg) } else if len(files) == 1 { op, val := opVal(files[0]) cond := fmt.Sprintf(" F.LOGICAL_FILE_NAME %s %s", op, placeholder("logical_file_name")) @@ -25,7 +27,7 @@ func (API) Files(params Record) []Record { datasets := getValues(params, "dataset") if len(datasets) > 1 { msg := "The files API does not support list of datasets" - return errorRecord(msg) + return errors.New(msg) } else if len(datasets) == 1 { op, val := opVal(datasets[0]) cond := fmt.Sprintf(" D.DATASET %s %s", op, placeholder("dataset")) @@ -35,7 +37,7 @@ func (API) Files(params Record) []Record { block_names := getValues(params, "block_name") if len(block_names) > 1 { msg := "The files API does not support list of block_names" - return errorRecord(msg) + return errors.New(msg) } else if len(block_names) == 1 { op, val := opVal(block_names[0]) cond := fmt.Sprintf(" B.BLOCK_NAME %s %s", op, placeholder("block_name")) @@ -45,11 +47,11 @@ func (API) Files(params Record) []Record { // get SQL statement from static area stm := getSQL("files") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } // filechildren API -func (API) FileChildren(params Record) []Record { +func (API) FileChildren(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -58,7 +60,7 @@ func (API) FileChildren(params Record) []Record { filechildren := getValues(params, "logical_file_name") if len(filechildren) > 1 { msg := "The filechildren API does not support list of filechildren" - return errorRecord(msg) + return errors.New(msg) } else if len(filechildren) == 1 { op, val := opVal(filechildren[0]) cond := fmt.Sprintf(" F.LOGICAL_FILE_NAME %s %s", op, placeholder("logical_file_name")) @@ -68,11 +70,11 @@ func (API) FileChildren(params Record) []Record { // get SQL statement from static area stm := getSQL("filechildren") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } // fileparent API -func (API) FileParent(params Record) []Record { +func (API) FileParent(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -81,7 +83,7 @@ func (API) FileParent(params Record) []Record { fileparent := getValues(params, "logical_file_name") if len(fileparent) > 1 { msg := "The fileparent API does not support list of fileparent" - return errorRecord(msg) + return errors.New(msg) } else if len(fileparent) == 1 { op, val := opVal(fileparent[0]) cond := fmt.Sprintf(" F.LOGICAL_FILE_NAME %s %s", op, placeholder("logical_file_name")) @@ -91,11 +93,11 @@ func (API) FileParent(params Record) []Record { // get SQL statement from static area stm := getSQL("fileparent") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } // filesummaries API -func (API) FileSummaries(params Record) []Record { +func (API) FileSummaries(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} var stm, whererun, wheresql_run_list, wheresql_run_range, wheresql_isFileValid string @@ -160,11 +162,11 @@ func (API) FileSummaries(params Record) []Record { stm = strings.Replace(stm, "join_valid_ds2", join_valid_ds2, -1) // use generic query API to fetch the results from DB - return executeAll(stm, args...) + return executeAll(w, stm, args...) } // filelumis API -func (API) FileLumis(params Record) []Record { +func (API) FileLumis(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} var wheresql, wheresql_run_list, wheresql_run_range string @@ -237,5 +239,5 @@ func (API) FileLumis(params Record) []Record { } // use generic query API to fetch the results from DB - return executeAll(stm, args...) + return executeAll(w, stm, args...) } diff --git a/dbs/outputconfigs.go b/dbs/outputconfigs.go index 3b18332d..91b6fa7b 100644 --- a/dbs/outputconfigs.go +++ b/dbs/outputconfigs.go @@ -1,11 +1,13 @@ package dbs import ( + "errors" "fmt" + "net/http" ) // outputconfigs API -func (API) OutputConfigs(params Record) []Record { +func (API) OutputConfigs(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var sql1, sql2, stm string var args []interface{} @@ -17,7 +19,7 @@ func (API) OutputConfigs(params Record) []Record { block_id := getValues(params, "block_id") if len(block_id) > 1 { msg := "The outputconfigs API does not support list of block_id" - return errorRecord(msg) + return errors.New(msg) } else if len(block_id) == 1 { _, bid = opVal(block_id[0]) } @@ -81,5 +83,5 @@ func (API) OutputConfigs(params Record) []Record { } // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } diff --git a/dbs/physicsgroups.go b/dbs/physicsgroups.go index 8065f1b1..5a2982d1 100644 --- a/dbs/physicsgroups.go +++ b/dbs/physicsgroups.go @@ -1,11 +1,13 @@ package dbs import ( + "errors" "fmt" + "net/http" ) // physicsgroups API -func (API) PhysicsGroups(params Record) []Record { +func (API) PhysicsGroups(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -14,7 +16,7 @@ func (API) PhysicsGroups(params Record) []Record { physicsgroups := getValues(params, "physics_group_name") if len(physicsgroups) > 1 { msg := "The physicsgroups API does not support list of physicsgroups" - return errorRecord(msg) + return errors.New(msg) } else if len(physicsgroups) == 1 { op, val := opVal(physicsgroups[0]) cond := fmt.Sprintf(" pg.PHYSICS_GROUP_NAME %s %s", op, placeholder("physics_group_name")) @@ -26,5 +28,5 @@ func (API) PhysicsGroups(params Record) []Record { // get SQL statement from static area stm := getSQL("physicsgroups") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } diff --git a/dbs/primarydatasets.go b/dbs/primarydatasets.go index 3ad57875..da1c5acc 100644 --- a/dbs/primarydatasets.go +++ b/dbs/primarydatasets.go @@ -1,11 +1,13 @@ package dbs import ( + "errors" "fmt" + "net/http" ) // primarydatasets API -func (API) PrimaryDatasets(params Record) []Record { +func (API) PrimaryDatasets(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -14,7 +16,7 @@ func (API) PrimaryDatasets(params Record) []Record { primarydatasets := getValues(params, "primary_ds_name") if len(primarydatasets) > 1 { msg := "The primarydatasets API does not support list of primarydatasets" - return errorRecord(msg) + return errors.New(msg) } else if len(primarydatasets) == 1 { op, val := opVal(primarydatasets[0]) cond := fmt.Sprintf(" P.PRIMARY_DS_NAME %s %s", op, placeholder("primary_ds_name")) @@ -26,5 +28,5 @@ func (API) PrimaryDatasets(params Record) []Record { // get SQL statement from static area stm := getSQL("primarydatasets") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } diff --git a/dbs/primarydstypes.go b/dbs/primarydstypes.go index c723dcd8..36553342 100644 --- a/dbs/primarydstypes.go +++ b/dbs/primarydstypes.go @@ -1,11 +1,13 @@ package dbs import ( + "errors" "fmt" + "net/http" ) // primarydstypes API -func (API) Primarydstypes(params Record) []Record { +func (API) Primarydstypes(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -14,7 +16,7 @@ func (API) Primarydstypes(params Record) []Record { primarydstypes := getValues(params, "primary_ds_type") if len(primarydstypes) > 1 { msg := "The primarydstypes API does not support list of primarydstypes" - return errorRecord(msg) + return errors.New(msg) } else if len(primarydstypes) == 1 { op, val := opVal(primarydstypes[0]) cond := fmt.Sprintf(" PDT.PRIMARY_DS_TYPE %s %s", op, placeholder("primary_ds_type")) @@ -26,5 +28,5 @@ func (API) Primarydstypes(params Record) []Record { // get SQL statement from static area stm := getSQL("primarydstypes") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } diff --git a/dbs/processingeras.go b/dbs/processingeras.go index c401d2d3..79436b25 100644 --- a/dbs/processingeras.go +++ b/dbs/processingeras.go @@ -1,11 +1,13 @@ package dbs import ( + "errors" "fmt" + "net/http" ) // processingeras API -func (API) ProcessingEras(params Record) []Record { +func (API) ProcessingEras(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -14,7 +16,7 @@ func (API) ProcessingEras(params Record) []Record { processingeras := getValues(params, "processing_version") if len(processingeras) > 1 { msg := "The processingeras API does not support list of processingeras" - return errorRecord(msg) + return errors.New(msg) } else if len(processingeras) == 1 { op, val := opVal(processingeras[0]) cond := fmt.Sprintf(" PE.PROCESSING_VERSION %s %s", op, placeholder("processing_version")) @@ -26,5 +28,5 @@ func (API) ProcessingEras(params Record) []Record { // get SQL statement from static area stm := getSQL("processingeras") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } diff --git a/dbs/releaseversions.go b/dbs/releaseversions.go index b5adfe7c..c6829d66 100644 --- a/dbs/releaseversions.go +++ b/dbs/releaseversions.go @@ -1,11 +1,13 @@ package dbs import ( + "errors" "fmt" + "net/http" ) // releaseversions API -func (API) ReleaseVersions(params Record) []Record { +func (API) ReleaseVersions(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -14,7 +16,7 @@ func (API) ReleaseVersions(params Record) []Record { releaseversions := getValues(params, "release_version") if len(releaseversions) > 1 { msg := "The releaseversions API does not support list of releaseversions" - return errorRecord(msg) + return errors.New(msg) } else if len(releaseversions) == 1 { op, val := opVal(releaseversions[0]) cond := fmt.Sprintf(" RV.release_version %s %s", op, placeholder("release_version")) @@ -26,5 +28,5 @@ func (API) ReleaseVersions(params Record) []Record { // get SQL statement from static area stm := getSQL("releaseversions") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } diff --git a/dbs/runs.go b/dbs/runs.go index c3ae2284..327be166 100644 --- a/dbs/runs.go +++ b/dbs/runs.go @@ -1,11 +1,13 @@ package dbs import ( + "errors" "fmt" + "net/http" ) // runs API -func (API) Runs(params Record) []Record { +func (API) Runs(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "" @@ -17,7 +19,7 @@ func (API) Runs(params Record) []Record { dataset := getValues(params, "dataset") if len(runs) > 1 { msg := "The runs API does not support list of runs" - return errorRecord(msg) + return errors.New(msg) } else if len(runs) == 1 { op, val := opVal(runs[0]) cond := fmt.Sprintf(" FL.run_num %s %s", op, placeholder("run_num")) @@ -40,16 +42,16 @@ func (API) Runs(params Record) []Record { args = append(args, val) } else { msg := fmt.Sprintf("No arguments for runs API") - return errorRecord(msg) + return errors.New(msg) } // get SQL statement from static area stm := getSQL("runs") // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } // runs API -func (API) RunSummaries(params Record) []Record { +func (API) RunSummaries(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := " WHERE " @@ -60,7 +62,7 @@ func (API) RunSummaries(params Record) []Record { runs := getValues(params, "run_num") if len(runs) > 1 { msg := "The runs API does not support list of runs" - return errorRecord(msg) + return errors.New(msg) } else if len(runs) == 1 { _, val := opVal(runs[0]) cond := fmt.Sprintf(" RUN_NUM = %s", placeholder("run_num")) @@ -68,7 +70,7 @@ func (API) RunSummaries(params Record) []Record { where += addCond(where, cond) } else { msg := fmt.Sprintf("No arguments for runsummaries API") - return errorRecord(msg) + return errors.New(msg) } dataset := getValues(params, "dataset") if len(dataset) == 1 { @@ -80,5 +82,5 @@ func (API) RunSummaries(params Record) []Record { where += addCond(where, cond) } // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) + return executeAll(w, stm+where, args...) } diff --git a/dbs/tiers.go b/dbs/tiers.go index 106c13f6..073d975a 100644 --- a/dbs/tiers.go +++ b/dbs/tiers.go @@ -6,33 +6,8 @@ import ( "net/http" ) -// tiers API -func (API) DataTiers(params Record) []Record { - // variables we'll use in where clause - var args []interface{} - where := "WHERE " - - // parse dataset argument - tiers := getValues(params, "data_tier_name") - if len(tiers) > 1 { - msg := "The datatiers API does not support list of tiers" - return errorRecord(msg) - } else if len(tiers) == 1 { - op, val := opVal(tiers[0]) - cond := fmt.Sprintf(" DT.DATA_TIER_NAME %s %s", op, placeholder("data_tier_name")) - where += addCond(where, cond) - args = append(args, val) - } else { - where = "" // no arguments - } - // get SQL statement from static area - stm := getSQL("tiers") - // use generic query API to fetch the results from DB - return executeAll(stm+where, args...) -} - -// new imeplementation of using io.Writer -func (API) DataTiersNew(params Record, w http.ResponseWriter) error { +// DataTiers API +func (API) DataTiers(params Record, w http.ResponseWriter) error { // variables we'll use in where clause var args []interface{} where := "WHERE " @@ -53,5 +28,5 @@ func (API) DataTiersNew(params Record, w http.ResponseWriter) error { // get SQL statement from static area stm := getSQL("tiers") // use generic query API to fetch the results from DB - return executeAllNew(w, stm+where, args...) + return executeAll(w, stm+where, args...) } diff --git a/test/utils_test.go b/test/utils_test.go index edfa7b53..19fcc26f 100644 --- a/test/utils_test.go +++ b/test/utils_test.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "testing" "github.com/vkuznet/dbs2go/utils" @@ -18,3 +19,25 @@ func TestInList(t *testing.T) { t.Error("Fail TestInList") } } + +// TestRecordSize +func TestRecordSize(t *testing.T) { + rec := make(map[string]int) + rec["a"] = 1 + rec["b"] = 2 + size, err := utils.RecordSize(rec) + if err != nil { + t.Error("Fail in RecordSize", err) + } + fmt.Println("record", rec, "size", size) +} + +// BenchmarkRecordSize +func BenchmarkRecordSize(b *testing.B) { + rec := make(map[string]int) + rec["a"] = 1 + rec["b"] = 2 + for i := 0; i < b.N; i++ { + utils.RecordSize(rec) + } +} diff --git a/utils/utils.go b/utils/utils.go index 411d6797..5ffa520a 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -1,6 +1,8 @@ package utils import ( + "encoding/binary" + "encoding/json" "fmt" "io/ioutil" "log" @@ -14,6 +16,15 @@ var VERBOSE int var STATICDIR string var PROFILE bool +// RecordSize +func RecordSize(v interface{}) (int64, error) { + data, err := json.Marshal(v) + if err == nil { + return int64(binary.Size(data)), nil + } + return 0, err +} + // helper function to return Stack func Stack() string { trace := make([]byte, 2048) diff --git a/web/handlers.go b/web/handlers.go index 1dff654b..d2d3609e 100644 --- a/web/handlers.go +++ b/web/handlers.go @@ -5,6 +5,7 @@ package web import ( "encoding/binary" "encoding/json" + "errors" "fmt" "log" "net/http" @@ -103,71 +104,34 @@ func DBSGetHandler(w http.ResponseWriter, r *http.Request, a string) (int, int64 for k, v := range r.Form { params[k] = v } - var records []dbs.Record var api dbs.API + var err error if a == "datatiers" { - records = api.DataTiers(params) + err = api.DataTiers(params, w) } else if a == "datasets" { - records = api.Datasets(params) + err = api.Datasets(params, w) } else if a == "blocks" { - records = api.Blocks(params) + err = api.Blocks(params, w) } else if a == "files" { - records = api.Files(params) + err = api.Files(params, w) } else { - rec := make(dbs.Record) - rec["error"] = fmt.Sprintf("not implemented API %s", api) - records = append(records, rec) + err = errors.New(fmt.Sprintf("not implemented API %s", api)) } - var size int64 - enc := json.NewEncoder(w) - for _, rec := range records { - err := enc.Encode(rec) - size += int64(binary.Size(rec)) - if err != nil { - return http.StatusInternalServerError, 0, err - } - + if err != nil { + return http.StatusInternalServerError, 0, err } - // data, err := json.Marshal(records) - // if err != nil { - // return http.StatusInternalServerError, 0, err - // } - // w.WriteHeader(status) - // w.Write(data) - // size := int64(binary.Size(data)) + var size int64 return status, size, nil } // DatatiersHandler func DatatiersHandler(w http.ResponseWriter, r *http.Request) (int, int64, error) { - // return DBSGetHandler(w, r, "datatiers") - status := http.StatusOK - var params dbs.Record - for k, v := range r.Form { - params[k] = v - } - var api dbs.API - err := api.DataTiersNew(params, w) - if err != nil { - return http.StatusInternalServerError, 0, err - } - return status, 0, nil + return DBSGetHandler(w, r, "datatiers") } // DatasetsHandler func DatasetsHandler(w http.ResponseWriter, r *http.Request) (int, int64, error) { - // return DBSGetHandler(w, r, "datasets") - status := http.StatusOK - var params dbs.Record - for k, v := range r.Form { - params[k] = v - } - var api dbs.API - err := api.DatasetsNew(params, w) - if err != nil { - return http.StatusInternalServerError, 0, err - } - return status, 0, nil + return DBSGetHandler(w, r, "datasets") } // BlocksHandler