diff --git a/docs/docs/connectors/elasticsearch.md b/docs/docs/connectors/elasticsearch.md index 9313122b2..d225d19b6 100644 --- a/docs/docs/connectors/elasticsearch.md +++ b/docs/docs/connectors/elasticsearch.md @@ -40,4 +40,49 @@ sqlBuilder.Sql(@" factory.AddElasticsearchSink("*", elasticSearchConnectionSettings); ... -``` \ No newline at end of file +``` + +### Set alias on initial data completion + +One way to integrate with elasticsearch is to create a new index for each new stream version and change an alias to point to the new index. +This is possible by using the *GetIndexNameFunc* and *OnInitialDataSent* functions in the options. + +Example: + +```csharp +factory.AddElasticsearchSink("*", new FlowtideDotNet.Connector.ElasticSearch.FlowtideElasticsearchOptions() +{ + ConnectionSettings = connectionSettings, + CustomMappings = (props) => + { + // Add cusotm mappings + }, + GetIndexNameFunc = (writeRelation) => + { + // Set an index name that will be unique for this run + // The index name must be possible to be recovered between crashes to write to the same index + return $"{writeRelation.NamedObject.DotSeperated}-{tagVersion}"; + }, + OnInitialDataSent = async (client, writeRelation, indexName) => + { + var aliasName = writeRelation.NamedObject.DotSeperated; + // Get indices that the alias already points to. + var oldIndices = await client.GetIndicesPointingToAliasAsync(aliasName); + // Add the index to the alias + var putAliasResponse = await client.Indices.PutAliasAsync(indexName, aliasName); + + if (putAliasResponse.IsValid) + { + // Remove all old indices that existed on the alias + foreach (var oldIndex in oldIndices) + { + await client.Indices.DeleteAsync(oldIndex); + } + } + else + { + throw new InvalidOperationException(putAliasResponse.ServerError.Error.StackTrace); + } + }, +}); +```