Skip to content

Commit

Permalink
Add elasticsearch get index function and on initial data sent func
Browse files Browse the repository at this point in the history
This change should allow implementations that set aliases on initial data completion, and deletion of old indices.
  • Loading branch information
Ulimo committed Dec 23, 2023
1 parent e3bb37f commit 86c6ce0
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ public static class FlowtideElasticsearchReadWriteFactoryExtensions
public static ReadWriteFactory AddElasticsearchSink(
this ReadWriteFactory factory,
string regexPattern,
ConnectionSettings options,
Action<IProperties>? customMappings = null,
FlowtideElasticsearchOptions options,
Action<WriteRelation>? transform = null)
{
if (regexPattern == "*")
Expand All @@ -45,13 +44,7 @@ public static ReadWriteFactory AddElasticsearchSink(
}
transform?.Invoke(writeRel);

FlowtideElasticsearchOptions flowtideElasticsearchOptions = new FlowtideElasticsearchOptions()
{
ConnectionSettings = options,
CustomMappings = customMappings
};

var sink = new ElasticSearchSink(writeRel, flowtideElasticsearchOptions, Operators.Write.ExecutionMode.OnCheckpoint, opt);
var sink = new ElasticSearchSink(writeRel, options, Operators.Write.ExecutionMode.OnCheckpoint, opt);
sink.CreateIndexAndMappings();
return sink;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Nest;
using FlowtideDotNet.Substrait.Relations;
using Nest;
using System.Text.Json;

namespace FlowtideDotNet.Connector.ElasticSearch
{
Expand All @@ -13,5 +15,15 @@ public class FlowtideElasticsearchOptions
/// If the index does not exist the properties will be empty.
/// </summary>
public Action<IProperties>? CustomMappings { get; set; }

public Func<WriteRelation, string>? GetIndexNameFunc { get; set; }

/// <summary>
/// Function that gets called after the initial data has been saved to elasticsearch.
/// Parameters are the elasticsearch client, the write relation and the index name.
///
/// This function can be used for instance to create an alias to the index.
/// </summary>
public Func<IElasticClient, WriteRelation, string, Task>? OnInitialDataSent { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,26 @@ internal class ElasticSearchSink : SimpleGroupedWriteOperator
private StreamEventToJsonElastic? m_serializer;
private IReadOnlyList<int> m_primaryKeys;
private readonly string m_displayName;
private string m_indexName;

public ElasticSearchSink(WriteRelation writeRelation, FlowtideElasticsearchOptions elasticsearchOptions, ExecutionMode executionMode, ExecutionDataflowBlockOptions executionDataflowBlockOptions)
: base(executionMode, executionDataflowBlockOptions)
{
this.writeRelation = writeRelation;
if (elasticsearchOptions.GetIndexNameFunc == null)
{
m_indexName = writeRelation.NamedObject.DotSeperated;
}
else
{
m_indexName = elasticsearchOptions.GetIndexNameFunc(writeRelation);
}

this.m_elasticsearchOptions = elasticsearchOptions;
m_displayName = $"ElasticSearchSink-{writeRelation.NamedObject.DotSeperated}";
m_displayName = $"ElasticSearchSink-{m_indexName}";
var idFieldIndex = FindUnderscoreIdField(writeRelation);
m_primaryKeys = new List<int>() { idFieldIndex };
m_serializer = new StreamEventToJsonElastic(idFieldIndex, writeRelation.NamedObject.DotSeperated, writeRelation.TableSchema.Names);
m_serializer = new StreamEventToJsonElastic(idFieldIndex, m_indexName, writeRelation.TableSchema.Names);
}

private int FindUnderscoreIdField(WriteRelation writeRelation)
Expand All @@ -66,10 +76,10 @@ internal void CreateIndexAndMappings()
{
var m_client = new ElasticClient(m_elasticsearchOptions.ConnectionSettings);

var existingIndex = m_client.Indices.Get(writeRelation.NamedObject.DotSeperated);
var existingIndex = m_client.Indices.Get(m_indexName);
IndexState? indexState = default;
IProperties? properties = null;
if (existingIndex != null && existingIndex.IsValid && existingIndex.Indices.TryGetValue(writeRelation.NamedObject.DotSeperated, out indexState))
if (existingIndex != null && existingIndex.IsValid && existingIndex.Indices.TryGetValue(m_indexName, out indexState))
{
properties = indexState.Mappings.Properties ?? new Properties();
}
Expand All @@ -85,14 +95,14 @@ internal void CreateIndexAndMappings()

if (indexState == null)
{
var response = m_client.Indices.Create(writeRelation.NamedObject.DotSeperated);
var response = m_client.Indices.Create(m_indexName);
if (!response.IsValid)
{
throw new InvalidOperationException(response.ServerError.Error.Reason);
}
}

var mapResponse = m_client.Map(new PutMappingRequest(writeRelation.NamedObject.DotSeperated)
var mapResponse = m_client.Map(new PutMappingRequest(m_indexName)
{
Properties = properties
});
Expand All @@ -109,6 +119,15 @@ protected override async Task<MetadataResult> SetupAndLoadMetadataAsync()
return new MetadataResult(m_primaryKeys);
}

protected override Task OnInitialDataSent()
{
if (m_elasticsearchOptions.OnInitialDataSent != null)
{
return m_elasticsearchOptions.OnInitialDataSent(m_client!, writeRelation, m_indexName);
}
return base.OnInitialDataSent();
}

protected override async Task UploadChanges(IAsyncEnumerable<SimpleChangeEvent> rows, Watermark watermark, CancellationToken cancellationToken)
{
Debug.Assert(m_client != null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ public ElasticsearchTestStream(ElasticSearchFixture elasticSearchFixture, string

protected override void AddWriteResolvers(ReadWriteFactory factory)
{
factory.AddElasticsearchSink("*", elasticSearchFixture.GetConnectionSettings(), customMappings: customMapping);
factory.AddElasticsearchSink("*", new FlowtideElasticsearchOptions()
{
ConnectionSettings = elasticSearchFixture.GetConnectionSettings(),
CustomMappings = customMapping
});
}
}
}

0 comments on commit 86c6ce0

Please sign in to comment.