Skip to content

Commit

Permalink
Add support for custom mappings in elasticsearch (#220)
Browse files Browse the repository at this point in the history
* Add support for custom mappings in elasticsearch

* Add test cases

* Fix duplicate code
  • Loading branch information
Ulimo authored Dec 22, 2023
1 parent d10565b commit a8f6cf8
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ namespace FlowtideDotNet.Core.Engine
{
public static class FlowtideElasticsearchReadWriteFactoryExtensions
{
public static ReadWriteFactory AddElasticsearchSink(this ReadWriteFactory factory, string regexPattern, ConnectionSettings options, Action<WriteRelation>? transform = null)
public static ReadWriteFactory AddElasticsearchSink(
this ReadWriteFactory factory,
string regexPattern,
ConnectionSettings options,
Action<IProperties>? customMappings = null,
Action<WriteRelation>? transform = null)
{
if (regexPattern == "*")
{
Expand All @@ -42,10 +47,13 @@ public static ReadWriteFactory AddElasticsearchSink(this ReadWriteFactory factor

FlowtideElasticsearchOptions flowtideElasticsearchOptions = new FlowtideElasticsearchOptions()
{
ConnectionSettings = options
ConnectionSettings = options,
CustomMappings = customMappings
};

return new ElasticSearchSink(writeRel, flowtideElasticsearchOptions, Operators.Write.ExecutionMode.OnCheckpoint, opt);
var sink = new ElasticSearchSink(writeRel, flowtideElasticsearchOptions, Operators.Write.ExecutionMode.OnCheckpoint, opt);
sink.CreateIndexAndMappings();
return sink;
});
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,13 @@ namespace FlowtideDotNet.Connector.ElasticSearch
public class FlowtideElasticsearchOptions
{
public ConnectionSettings? ConnectionSettings { get; set; }

/// <summary>
/// Action to apply custom mappings to the index
/// This will be called on startup.
///
/// If the index does not exist the properties will be empty.
/// </summary>
public Action<IProperties>? CustomMappings { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,50 @@ private int FindUnderscoreIdField(WriteRelation writeRelation)

public override string DisplayName => m_displayName;

internal void CreateIndexAndMappings()
{
var m_client = new ElasticClient(m_elasticsearchOptions.ConnectionSettings);

var existingIndex = m_client.Indices.Get(writeRelation.NamedObject.DotSeperated);
IndexState? indexState = default;
IProperties? properties = null;
if (existingIndex != null && existingIndex.IsValid && existingIndex.Indices.TryGetValue(writeRelation.NamedObject.DotSeperated, out indexState))
{
properties = indexState.Mappings.Properties ?? new Properties();
}
else
{
properties = new Properties();
}

if (m_elasticsearchOptions.CustomMappings != null)
{
m_elasticsearchOptions.CustomMappings(properties);
}

if (indexState == null)
{
var response = m_client.Indices.Create(writeRelation.NamedObject.DotSeperated);
if (!response.IsValid)
{
throw new InvalidOperationException(response.ServerError.Error.Reason);
}
}

var mapResponse = m_client.Map(new PutMappingRequest(writeRelation.NamedObject.DotSeperated)
{
Properties = properties
});

if (!mapResponse.IsValid)
{
throw new InvalidOperationException(mapResponse.ServerError.Error.Reason);
}
}

protected override async Task<MetadataResult> SetupAndLoadMetadataAsync()
{
m_client = new ElasticClient(m_elasticsearchOptions.ConnectionSettings);
var existingIndex = await m_client.Indices.GetAsync(writeRelation.NamedObject.DotSeperated);
existingIndex.Indices.TryGetValue(writeRelation.NamedObject.DotSeperated, out var index);

return new MetadataResult(m_primaryKeys);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ public class ElasticSearchFixture : IAsyncLifetime
private IContainer? container;
public async Task DisposeAsync()
{
await container.DisposeAsync();
if (container != null)
{
await container.DisposeAsync();
}

}

private sealed class WaitUntil : IWaitUntil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using FlowtideDotNet.AcceptanceTests.Internal;
using FlowtideDotNet.Connector.CosmosDB.Tests;
using FlowtideDotNet.Core.Engine;
using Nest;
using System;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -24,15 +25,17 @@ namespace FlowtideDotNet.Connector.ElasticSearch.Tests
internal class ElasticsearchTestStream : FlowtideTestStream
{
private readonly ElasticSearchFixture elasticSearchFixture;
private readonly Action<IProperties>? customMapping;

public ElasticsearchTestStream(ElasticSearchFixture elasticSearchFixture, string testName) : base(testName)
public ElasticsearchTestStream(ElasticSearchFixture elasticSearchFixture, string testName, Action<IProperties>? customMapping = null) : base(testName)
{
this.elasticSearchFixture = elasticSearchFixture;
this.customMapping = customMapping;
}

protected override void AddWriteResolvers(ReadWriteFactory factory)
{
factory.AddElasticsearchSink("*", elasticSearchFixture.GetConnectionSettings());
factory.AddElasticsearchSink("*", elasticSearchFixture.GetConnectionSettings(), customMappings: customMapping);
}
}
}
117 changes: 117 additions & 0 deletions tests/FlowtideDotNet.Connector.ElasticSearch.Tests/SinkTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,122 @@ FROM users
} while (!success);

}

[Fact]
public async Task TestInsertWithCustomMappingIndexDoesNotExist()
{
ElasticsearchTestStream stream = new ElasticsearchTestStream(elasticSearchFixture, "TestInsert", (properties) =>
{
properties["FirstName"] = new KeywordProperty();
});
stream.Generate();
await stream.StartStream(@"
INSERT INTO testindex
SELECT
UserKey as _id,
FirstName,
LastName,
UserKey as pk
FROM users
");

ElasticClient elasticClient = new ElasticClient(elasticSearchFixture.GetConnectionSettings());

bool success = false;
do
{
var resp = await elasticClient.LowLevel.GetAsync<StringResponse>("testindex", "5");
success = resp.ApiCall.HttpStatusCode == 200;
await Task.Delay(10);
} while (!success);

}

[Fact]
public async Task TestInsertWithCustomMappingIndexExistsWithNoMappings()
{
ElasticClient elasticClient = new ElasticClient(elasticSearchFixture.GetConnectionSettings());
elasticClient.Indices.Create("testindex");
ElasticsearchTestStream stream = new ElasticsearchTestStream(elasticSearchFixture, "TestInsert", (properties) =>
{
properties["FirstName"] = new KeywordProperty();
});
stream.Generate();
await stream.StartStream(@"
INSERT INTO testindex
SELECT
UserKey as _id,
FirstName,
LastName,
UserKey as pk
FROM users
");

bool success = false;
do
{
var resp = await elasticClient.LowLevel.GetAsync<StringResponse>("testindex", "5");
success = resp.ApiCall.HttpStatusCode == 200;
await Task.Delay(10);
} while (!success);
}

[Fact]
public async Task TestInsertWithCustomMappingIndexExistsWithMappings()
{
ElasticClient elasticClient = new ElasticClient(elasticSearchFixture.GetConnectionSettings());
elasticClient.Indices.Create("testindex", c => c.Map(m => m.Properties(p => p.Keyword(k => k.Name("FirstName")))));
ElasticsearchTestStream stream = new ElasticsearchTestStream(elasticSearchFixture, "TestInsert", (properties) =>
{
properties["FirstName"] = new KeywordProperty();
});
stream.Generate();
await stream.StartStream(@"
INSERT INTO testindex
SELECT
UserKey as _id,
FirstName,
LastName,
UserKey as pk
FROM users
");

bool success = false;
do
{
var resp = await elasticClient.LowLevel.GetAsync<StringResponse>("testindex", "5");
success = resp.ApiCall.HttpStatusCode == 200;
await Task.Delay(10);
} while (!success);

}

[Fact]
public async Task TestInsertWithCustomMappingIndexExistsWithMappingsCollision()
{
ElasticClient elasticClient = new ElasticClient(elasticSearchFixture.GetConnectionSettings());
elasticClient.Indices.Delete("testindex");
elasticClient.Indices.Create("testindex", c => c.Map(m => m.Properties(p => p.Text(k => k.Name("FirstName")))));
ElasticsearchTestStream stream = new ElasticsearchTestStream(elasticSearchFixture, "TestInsert", (properties) =>
{
properties["FirstName"] = new KeywordProperty();
});
stream.Generate();

var ex = await Assert.ThrowsAsync<InvalidOperationException>(async () =>
{
await stream.StartStream(@"
INSERT INTO testindex
SELECT
UserKey as _id,
FirstName,
LastName,
UserKey as pk
FROM users
");
});

Assert.Equal("mapper [FirstName] cannot be changed from type [text] to [keyword]", ex.Message);
}
}
}

0 comments on commit a8f6cf8

Please sign in to comment.