Skip to content

Commit

Permalink
create db lua 7970 (#8000)
Browse files Browse the repository at this point in the history
* create db lua 7970

* delete db lua 7970

* description 7970

* catalogID 7970

* create shortcut 7970

* table param from Lua 7970

* different param options 7970

* small fix 7970

* json marshall 7970

* docs 7970

* export glue options 7970

* conditions 7970

* try param 7970

* try param 7970

* review fixes 7970

* createdb review fixes 7970

* exporter review fixes 7970

* exporter review fixes 7970

* Update lua.md params

* Update glue_exporter.lua

* Apply suggestions from code review

Co-authored-by: isan_rivkin <isanrivkin@gmail.com>

* table not json

* Apply suggestions from code review

Co-authored-by: isan_rivkin <isanrivkin@gmail.com>

* exporter review fixes 7970

* Update lua.md

* Update lua.md

* Update glue_exporter.lua

---------

Co-authored-by: isan_rivkin <isanrivkin@gmail.com>
  • Loading branch information
nadavsteindler and Isan-Rivkin authored Aug 7, 2024
1 parent 01cc109 commit 1e1d2f5
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 11 deletions.
47 changes: 42 additions & 5 deletions docs/howto/hooks/lua.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,45 @@ local aws = require("aws")
local glue = aws.glue_client("ACCESS_KEY_ID", "SECRET_ACCESS_KEY", "REGION")
```

### `aws/glue.create_database(database, options)`

Create a new Database in Glue Catalog.

Parameters:

- `database(string)`: Glue Database name.
- `options(table)` (optional):
- `error_on_already_exists(boolean)`: Whether the call fail with an error if a DB with this name already exists
- `create_db_input(Table)`: a Table that is passed "as is" to AWS and is parallel to the AWS SDK [CreateDatabaseInput](https://docs.aws.amazon.com/glue/latest/webapi/API_CreateDatabase.html#API_CreateDatabase_RequestSyntax)

Example:

```lua
local opts = {
error_on_already_exists = false,
create_db_input = {DatabaseInput = {Description = "Created via LakeFS Action"}, Tags = {Owner = "Joe"}}
}
glue.create_database(db, opts)
```

### `aws/glue.delete_database(database, catalog_id)`

Delete an existing Database in Glue Catalog.

Parameters:

- `database(string)`: Glue Database name.
- `catalog_id(string)` (optional): Glue Catalog ID

Example:

```lua
glue.delete_database(db, "461129977393")
```

### `aws/glue.get_table(database, table [, catalog_id)`

Describe a table from the Glue catalog.
Describe a table from the Glue Catalog.

Example:

Expand Down Expand Up @@ -703,14 +739,15 @@ Parameters:
- `glue`: AWS glue client
- `db(string)`: glue database name
- `table_src_path(string)`: path to table spec (e.g. _lakefs_tables/my_table.yaml)
- `create_table_input(Table)`: Input equal mapping to [table_input](https://docs.aws.amazon.com/glue/latest/webapi/API_CreateTable.html#API_CreateTable_RequestSyntax) in AWS, the same as we use for `glue.create_table`.
- `create_table_input(table)`: Input equal mapping to [table_input](https://docs.aws.amazon.com/glue/latest/webapi/API_CreateTable.html#API_CreateTable_RequestSyntax) in AWS, the same as we use for `glue.create_table`.
should contain inputs describing the data format (e.g. InputFormat, OutputFormat, SerdeInfo) since the exporter is agnostic to this.
by default this function will configure table location and schema.
- `action_info(Table)`: the global action object.
- `options(Table)`:
- `action_info(table)`: the global action object.
- `options(table)`:
- `table_name(string)`: Override default glue table name
- `debug(boolean`
- `export_base_uri(string)`: Override the default prefix in S3 for symlink location e.g. s3://other-bucket/path/
- `create_db_input(table)`: if this is specified, then it indicates we want to create a new database for the table export. The parameter expects a table that is converted to JSON and passed "as is" to AWS and is parallel to the AWS SDK [CreateDatabaseInput](https://docs.aws.amazon.com/glue/latest/webapi/API_CreateDatabase.html#API_CreateDatabase_RequestSyntax)

When creating a glue table, the final table input will consist of the `create_table_input` input parameter and lakeFS computed defaults that will override it:

Expand Down Expand Up @@ -738,7 +775,7 @@ local table_input = {
EXTERNAL: "TRUE"
"parquet.compression": "SNAPPY"
}
exporter.export_glue(glue, "my-db", "_lakefs_tables/animals.yaml", table_input, action, {debug=true})
exporter.export_glue(glue, "my-db", "_lakefs_tables/animals.yaml", table_input, action, {debug=true, create_db_input = {DatabaseInput = {Description="DB exported from LakeFS"}, Tags = {Owner = "Joe"}}})
```

### `lakefs/catalogexport/glue_exporter.get_full_table_name(descriptor, action_info)`
Expand Down
3 changes: 2 additions & 1 deletion esti/export_hooks_files/glue/scripts/glue_exporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ local aws = require("aws")
local exporter = require("lakefs/catalogexport/glue_exporter")
action.commit_id = "{{ .OverrideCommitID }}" -- override commit id to use specific symlink file previously created
local glue = aws.glue_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region)
exporter.export_glue(glue, args.catalog.db_name, args.table_source, args.catalog.table_input, action, {debug=true})
local create_db_input_table = {DatabaseInput = {Description="Created by Glue Exporter"}}
exporter.export_glue(glue, args.catalog.db_name, args.table_source, args.catalog.table_input, action, {debug=true, create_db_input = create_db_input_table})
11 changes: 10 additions & 1 deletion pkg/actions/lua/lakefs/catalogexport/glue_exporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ end
- table_name(string): override default glue table name
- debug(boolean)
- export_base_uri(string): override the default prefix in S3 for symlink location i.e s3://other-bucket/path/
- create_db_input(table): parameters for creating the database. If nil, then the DB must already exisst
]]
local function export_glue(glue, db, table_src_path, create_table_input, action_info, options)
local opts = options or {}
Expand All @@ -127,6 +128,14 @@ local function export_glue(glue, db, table_src_path, create_table_input, action_
error("table " .. descriptor.type .. " in path " .. table_src_path .. " not supported")
end

if opts.create_db_input ~= nil then
local dbopts = { error_on_already_exists = false, create_db_input = opts.create_db_input }
glue.create_database(db, dbopts)
if opts.debug then
print("success creating / verifying glue database")
end
end

-- finallize create glue table input
local table_input = build_glue_create_table_input(create_table_input, descriptor, symlink_location, columns,
partitions, action_info, opts)
Expand All @@ -143,6 +152,6 @@ local function export_glue(glue, db, table_src_path, create_table_input, action_
end

return {
get_full_table_name=get_full_table_name,
get_full_table_name = get_full_table_name,
export_glue = export_glue
}
86 changes: 82 additions & 4 deletions pkg/actions/lua/storage/aws/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/glue"
"github.com/aws/aws-sdk-go-v2/service/glue/types"
"github.com/mitchellh/mapstructure"
"github.com/treeverse/lakefs/pkg/actions/lua/util"
)

Expand Down Expand Up @@ -55,10 +56,12 @@ type GlueClient struct {
}

var glueFunctions = map[string]func(client *GlueClient) lua.Function{
"get_table": getTable,
"create_table": createTable,
"update_table": updateTable,
"delete_table": deleteTable,
"get_table": getTable,
"create_table": createTable,
"update_table": updateTable,
"delete_table": deleteTable,
"create_database": createDatabase,
"delete_database": deleteDatabase,
}

func (c *GlueClient) client() *glue.Client {
Expand Down Expand Up @@ -223,3 +226,78 @@ func getTable(c *GlueClient) lua.Function {
return 2
}
}

type glueCreateDatabaseOpts struct {
CreateDBInput *glue.CreateDatabaseInput `json:"create_db_input" mapstructure:"create_db_input"`
ErrorOnAlreadyExists bool `json:"error_on_already_exists" mapstructure:"error_on_already_exists"`
}

func createDatabase(c *GlueClient) lua.Function {
return func(l *lua.State) int {
client := c.client()
database := lua.CheckString(l, 1)

createDBOpts := &glueCreateDatabaseOpts{
CreateDBInput: &glue.CreateDatabaseInput{
DatabaseInput: &types.DatabaseInput{
Name: aws.String(database),
},
},
ErrorOnAlreadyExists: true,
}
// decode opts if provided and override defaults
if !l.IsNoneOrNil(2) {
var t interface{}
t, err := util.PullTable(l, 2)
if err != nil {
lua.Errorf(l, "PullTable: %s", err.Error())
panic("unreachable")
}
err = mapstructure.Decode(t, createDBOpts)
if err != nil {
lua.Errorf(l, "DecodeTable: %s", err.Error())
panic("unreachable")
}
}

if *createDBOpts.CreateDBInput.DatabaseInput.Name != database {
lua.Errorf(l, "database name in input (%s) doesn't match database name parameter (%s)", *createDBOpts.CreateDBInput.DatabaseInput.Name, database)
panic("unreachable")
}

// AWS API call
_, err := client.CreateDatabase(c.ctx, createDBOpts.CreateDBInput)
if err != nil {
var errExists *types.AlreadyExistsException
if !createDBOpts.ErrorOnAlreadyExists && errors.As(err, &errExists) {
return 0
}
lua.Errorf(l, "%s", err.Error())
panic("unreachable")
}
return 0
}
}

func deleteDatabase(c *GlueClient) lua.Function {
return func(l *lua.State) int {
client := c.client()
database := aws.String(lua.CheckString(l, 1))

// check if catalog ID provided
var catalogID *string
if !l.IsNone(2) {
catalogID = aws.String(lua.CheckString(l, 2))
}

_, err := client.DeleteDatabase(c.ctx, &glue.DeleteDatabaseInput{
Name: database,
CatalogId: catalogID,
})
if err != nil {
lua.Errorf(l, "%s", err.Error())
panic("unreachable")
}
return 0
}
}

0 comments on commit 1e1d2f5

Please sign in to comment.