diff --git a/docs/howto/hooks/lua.md b/docs/howto/hooks/lua.md index 7c8c2e1058e..8f1b5a5f2b6 100644 --- a/docs/howto/hooks/lua.md +++ b/docs/howto/hooks/lua.md @@ -168,9 +168,45 @@ local aws = require("aws") local glue = aws.glue_client("ACCESS_KEY_ID", "SECRET_ACCESS_KEY", "REGION") ``` +### `aws/glue.create_database(database, options)` + +Create a new Database in Glue Catalog. + +Parameters: + +- `database(string)`: Glue Database name. +- `options(table)` (optional): + - `error_on_already_exists(boolean)`: Whether the call fail with an error if a DB with this name already exists + - `create_db_input(Table)`: a Table that is passed "as is" to AWS and is parallel to the AWS SDK [CreateDatabaseInput](https://docs.aws.amazon.com/glue/latest/webapi/API_CreateDatabase.html#API_CreateDatabase_RequestSyntax) + +Example: + +```lua +local opts = { + error_on_already_exists = false, + create_db_input = {DatabaseInput = {Description = "Created via LakeFS Action"}, Tags = {Owner = "Joe"}} +} +glue.create_database(db, opts) +``` + +### `aws/glue.delete_database(database, catalog_id)` + +Delete an existing Database in Glue Catalog. + +Parameters: + +- `database(string)`: Glue Database name. +- `catalog_id(string)` (optional): Glue Catalog ID + +Example: + +```lua +glue.delete_database(db, "461129977393") +``` + ### `aws/glue.get_table(database, table [, catalog_id)` -Describe a table from the Glue catalog. +Describe a table from the Glue Catalog. Example: @@ -703,14 +739,15 @@ Parameters: - `glue`: AWS glue client - `db(string)`: glue database name - `table_src_path(string)`: path to table spec (e.g. _lakefs_tables/my_table.yaml) -- `create_table_input(Table)`: Input equal mapping to [table_input](https://docs.aws.amazon.com/glue/latest/webapi/API_CreateTable.html#API_CreateTable_RequestSyntax) in AWS, the same as we use for `glue.create_table`. +- `create_table_input(table)`: Input equal mapping to [table_input](https://docs.aws.amazon.com/glue/latest/webapi/API_CreateTable.html#API_CreateTable_RequestSyntax) in AWS, the same as we use for `glue.create_table`. should contain inputs describing the data format (e.g. InputFormat, OutputFormat, SerdeInfo) since the exporter is agnostic to this. by default this function will configure table location and schema. -- `action_info(Table)`: the global action object. -- `options(Table)`: +- `action_info(table)`: the global action object. +- `options(table)`: - `table_name(string)`: Override default glue table name - `debug(boolean` - `export_base_uri(string)`: Override the default prefix in S3 for symlink location e.g. s3://other-bucket/path/ + - `create_db_input(table)`: if this is specified, then it indicates we want to create a new database for the table export. The parameter expects a table that is converted to JSON and passed "as is" to AWS and is parallel to the AWS SDK [CreateDatabaseInput](https://docs.aws.amazon.com/glue/latest/webapi/API_CreateDatabase.html#API_CreateDatabase_RequestSyntax) When creating a glue table, the final table input will consist of the `create_table_input` input parameter and lakeFS computed defaults that will override it: @@ -738,7 +775,7 @@ local table_input = { EXTERNAL: "TRUE" "parquet.compression": "SNAPPY" } -exporter.export_glue(glue, "my-db", "_lakefs_tables/animals.yaml", table_input, action, {debug=true}) +exporter.export_glue(glue, "my-db", "_lakefs_tables/animals.yaml", table_input, action, {debug=true, create_db_input = {DatabaseInput = {Description="DB exported from LakeFS"}, Tags = {Owner = "Joe"}}}) ``` ### `lakefs/catalogexport/glue_exporter.get_full_table_name(descriptor, action_info)` diff --git a/esti/export_hooks_files/glue/scripts/glue_exporter.lua b/esti/export_hooks_files/glue/scripts/glue_exporter.lua index 4ad77919f3d..4cae3e4ce2c 100644 --- a/esti/export_hooks_files/glue/scripts/glue_exporter.lua +++ b/esti/export_hooks_files/glue/scripts/glue_exporter.lua @@ -2,4 +2,5 @@ local aws = require("aws") local exporter = require("lakefs/catalogexport/glue_exporter") action.commit_id = "{{ .OverrideCommitID }}" -- override commit id to use specific symlink file previously created local glue = aws.glue_client(args.aws.aws_access_key_id, args.aws.aws_secret_access_key, args.aws.aws_region) -exporter.export_glue(glue, args.catalog.db_name, args.table_source, args.catalog.table_input, action, {debug=true}) \ No newline at end of file +local create_db_input_table = {DatabaseInput = {Description="Created by Glue Exporter"}} +exporter.export_glue(glue, args.catalog.db_name, args.table_source, args.catalog.table_input, action, {debug=true, create_db_input = create_db_input_table}) diff --git a/pkg/actions/lua/lakefs/catalogexport/glue_exporter.lua b/pkg/actions/lua/lakefs/catalogexport/glue_exporter.lua index 786eef88f02..fd0fb87e97a 100644 --- a/pkg/actions/lua/lakefs/catalogexport/glue_exporter.lua +++ b/pkg/actions/lua/lakefs/catalogexport/glue_exporter.lua @@ -103,6 +103,7 @@ end - table_name(string): override default glue table name - debug(boolean) - export_base_uri(string): override the default prefix in S3 for symlink location i.e s3://other-bucket/path/ + - create_db_input(table): parameters for creating the database. If nil, then the DB must already exisst ]] local function export_glue(glue, db, table_src_path, create_table_input, action_info, options) local opts = options or {} @@ -127,6 +128,14 @@ local function export_glue(glue, db, table_src_path, create_table_input, action_ error("table " .. descriptor.type .. " in path " .. table_src_path .. " not supported") end + if opts.create_db_input ~= nil then + local dbopts = { error_on_already_exists = false, create_db_input = opts.create_db_input } + glue.create_database(db, dbopts) + if opts.debug then + print("success creating / verifying glue database") + end + end + -- finallize create glue table input local table_input = build_glue_create_table_input(create_table_input, descriptor, symlink_location, columns, partitions, action_info, opts) @@ -143,6 +152,6 @@ local function export_glue(glue, db, table_src_path, create_table_input, action_ end return { - get_full_table_name=get_full_table_name, + get_full_table_name = get_full_table_name, export_glue = export_glue } diff --git a/pkg/actions/lua/storage/aws/glue.go b/pkg/actions/lua/storage/aws/glue.go index 72916d8a38a..56f0fa8209d 100644 --- a/pkg/actions/lua/storage/aws/glue.go +++ b/pkg/actions/lua/storage/aws/glue.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/glue" "github.com/aws/aws-sdk-go-v2/service/glue/types" + "github.com/mitchellh/mapstructure" "github.com/treeverse/lakefs/pkg/actions/lua/util" ) @@ -55,10 +56,12 @@ type GlueClient struct { } var glueFunctions = map[string]func(client *GlueClient) lua.Function{ - "get_table": getTable, - "create_table": createTable, - "update_table": updateTable, - "delete_table": deleteTable, + "get_table": getTable, + "create_table": createTable, + "update_table": updateTable, + "delete_table": deleteTable, + "create_database": createDatabase, + "delete_database": deleteDatabase, } func (c *GlueClient) client() *glue.Client { @@ -223,3 +226,78 @@ func getTable(c *GlueClient) lua.Function { return 2 } } + +type glueCreateDatabaseOpts struct { + CreateDBInput *glue.CreateDatabaseInput `json:"create_db_input" mapstructure:"create_db_input"` + ErrorOnAlreadyExists bool `json:"error_on_already_exists" mapstructure:"error_on_already_exists"` +} + +func createDatabase(c *GlueClient) lua.Function { + return func(l *lua.State) int { + client := c.client() + database := lua.CheckString(l, 1) + + createDBOpts := &glueCreateDatabaseOpts{ + CreateDBInput: &glue.CreateDatabaseInput{ + DatabaseInput: &types.DatabaseInput{ + Name: aws.String(database), + }, + }, + ErrorOnAlreadyExists: true, + } + // decode opts if provided and override defaults + if !l.IsNoneOrNil(2) { + var t interface{} + t, err := util.PullTable(l, 2) + if err != nil { + lua.Errorf(l, "PullTable: %s", err.Error()) + panic("unreachable") + } + err = mapstructure.Decode(t, createDBOpts) + if err != nil { + lua.Errorf(l, "DecodeTable: %s", err.Error()) + panic("unreachable") + } + } + + if *createDBOpts.CreateDBInput.DatabaseInput.Name != database { + lua.Errorf(l, "database name in input (%s) doesn't match database name parameter (%s)", *createDBOpts.CreateDBInput.DatabaseInput.Name, database) + panic("unreachable") + } + + // AWS API call + _, err := client.CreateDatabase(c.ctx, createDBOpts.CreateDBInput) + if err != nil { + var errExists *types.AlreadyExistsException + if !createDBOpts.ErrorOnAlreadyExists && errors.As(err, &errExists) { + return 0 + } + lua.Errorf(l, "%s", err.Error()) + panic("unreachable") + } + return 0 + } +} + +func deleteDatabase(c *GlueClient) lua.Function { + return func(l *lua.State) int { + client := c.client() + database := aws.String(lua.CheckString(l, 1)) + + // check if catalog ID provided + var catalogID *string + if !l.IsNone(2) { + catalogID = aws.String(lua.CheckString(l, 2)) + } + + _, err := client.DeleteDatabase(c.ctx, &glue.DeleteDatabaseInput{ + Name: database, + CatalogId: catalogID, + }) + if err != nil { + lua.Errorf(l, "%s", err.Error()) + panic("unreachable") + } + return 0 + } +}