Skip to content

Commit

Permalink
Add docs on how to set alias after initial data completion (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ulimo authored Dec 23, 2023
1 parent 86c6ce0 commit 21899b9
Showing 1 changed file with 46 additions and 1 deletion.
47 changes: 46 additions & 1 deletion docs/docs/connectors/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,49 @@ sqlBuilder.Sql(@"
factory.AddElasticsearchSink("*", elasticSearchConnectionSettings);

...
```
```

### Set alias on initial data completion

One way to integrate with elasticsearch is to create a new index for each new stream version and change an alias to point to the new index.
This is possible by using the *GetIndexNameFunc* and *OnInitialDataSent* functions in the options.

Example:

```csharp
factory.AddElasticsearchSink("*", new FlowtideDotNet.Connector.ElasticSearch.FlowtideElasticsearchOptions()
{
ConnectionSettings = connectionSettings,
CustomMappings = (props) =>
{
// Add cusotm mappings
},
GetIndexNameFunc = (writeRelation) =>
{
// Set an index name that will be unique for this run
// The index name must be possible to be recovered between crashes to write to the same index
return $"{writeRelation.NamedObject.DotSeperated}-{tagVersion}";
},
OnInitialDataSent = async (client, writeRelation, indexName) =>
{
var aliasName = writeRelation.NamedObject.DotSeperated;
// Get indices that the alias already points to.
var oldIndices = await client.GetIndicesPointingToAliasAsync(aliasName);
// Add the index to the alias
var putAliasResponse = await client.Indices.PutAliasAsync(indexName, aliasName);

if (putAliasResponse.IsValid)
{
// Remove all old indices that existed on the alias
foreach (var oldIndex in oldIndices)
{
await client.Indices.DeleteAsync(oldIndex);
}
}
else
{
throw new InvalidOperationException(putAliasResponse.ServerError.Error.StackTrace);
}
},
});
```

0 comments on commit 21899b9

Please sign in to comment.