diff --git a/src/FlowtideDotNet.Connector.ElasticSearch/Extensions/FlowtideElasticsearchReadWriteFactoryExtensions.cs b/src/FlowtideDotNet.Connector.ElasticSearch/Extensions/FlowtideElasticsearchReadWriteFactoryExtensions.cs index efa516941..3686012ac 100644 --- a/src/FlowtideDotNet.Connector.ElasticSearch/Extensions/FlowtideElasticsearchReadWriteFactoryExtensions.cs +++ b/src/FlowtideDotNet.Connector.ElasticSearch/Extensions/FlowtideElasticsearchReadWriteFactoryExtensions.cs @@ -25,7 +25,12 @@ namespace FlowtideDotNet.Core.Engine { public static class FlowtideElasticsearchReadWriteFactoryExtensions { - public static ReadWriteFactory AddElasticsearchSink(this ReadWriteFactory factory, string regexPattern, ConnectionSettings options, Action? transform = null) + public static ReadWriteFactory AddElasticsearchSink( + this ReadWriteFactory factory, + string regexPattern, + ConnectionSettings options, + Action? customMappings = null, + Action? transform = null) { if (regexPattern == "*") { @@ -42,10 +47,13 @@ public static ReadWriteFactory AddElasticsearchSink(this ReadWriteFactory factor FlowtideElasticsearchOptions flowtideElasticsearchOptions = new FlowtideElasticsearchOptions() { - ConnectionSettings = options + ConnectionSettings = options, + CustomMappings = customMappings }; - return new ElasticSearchSink(writeRel, flowtideElasticsearchOptions, Operators.Write.ExecutionMode.OnCheckpoint, opt); + var sink = new ElasticSearchSink(writeRel, flowtideElasticsearchOptions, Operators.Write.ExecutionMode.OnCheckpoint, opt); + sink.CreateIndexAndMappings(); + return sink; }); return factory; } diff --git a/src/FlowtideDotNet.Connector.ElasticSearch/FlowtideElasticsearchOptions.cs b/src/FlowtideDotNet.Connector.ElasticSearch/FlowtideElasticsearchOptions.cs index 33af7634d..e816aa9fd 100644 --- a/src/FlowtideDotNet.Connector.ElasticSearch/FlowtideElasticsearchOptions.cs +++ b/src/FlowtideDotNet.Connector.ElasticSearch/FlowtideElasticsearchOptions.cs @@ -5,5 +5,13 @@ namespace FlowtideDotNet.Connector.ElasticSearch public class FlowtideElasticsearchOptions { public ConnectionSettings? ConnectionSettings { get; set; } + + /// + /// Action to apply custom mappings to the index + /// This will be called on startup. + /// + /// If the index does not exist the properties will be empty. + /// + public Action? CustomMappings { get; set; } } } \ No newline at end of file diff --git a/src/FlowtideDotNet.Connector.ElasticSearch/Internal/ElasticSearchSink.cs b/src/FlowtideDotNet.Connector.ElasticSearch/Internal/ElasticSearchSink.cs index b883ca87d..f1b226087 100644 --- a/src/FlowtideDotNet.Connector.ElasticSearch/Internal/ElasticSearchSink.cs +++ b/src/FlowtideDotNet.Connector.ElasticSearch/Internal/ElasticSearchSink.cs @@ -62,12 +62,50 @@ private int FindUnderscoreIdField(WriteRelation writeRelation) public override string DisplayName => m_displayName; + internal void CreateIndexAndMappings() + { + var m_client = new ElasticClient(m_elasticsearchOptions.ConnectionSettings); + + var existingIndex = m_client.Indices.Get(writeRelation.NamedObject.DotSeperated); + IndexState? indexState = default; + IProperties? properties = null; + if (existingIndex != null && existingIndex.IsValid && existingIndex.Indices.TryGetValue(writeRelation.NamedObject.DotSeperated, out indexState)) + { + properties = indexState.Mappings.Properties ?? new Properties(); + } + else + { + properties = new Properties(); + } + + if (m_elasticsearchOptions.CustomMappings != null) + { + m_elasticsearchOptions.CustomMappings(properties); + } + + if (indexState == null) + { + var response = m_client.Indices.Create(writeRelation.NamedObject.DotSeperated); + if (!response.IsValid) + { + throw new InvalidOperationException(response.ServerError.Error.Reason); + } + } + + var mapResponse = m_client.Map(new PutMappingRequest(writeRelation.NamedObject.DotSeperated) + { + Properties = properties + }); + + if (!mapResponse.IsValid) + { + throw new InvalidOperationException(mapResponse.ServerError.Error.Reason); + } + } + protected override async Task SetupAndLoadMetadataAsync() { m_client = new ElasticClient(m_elasticsearchOptions.ConnectionSettings); - var existingIndex = await m_client.Indices.GetAsync(writeRelation.NamedObject.DotSeperated); - existingIndex.Indices.TryGetValue(writeRelation.NamedObject.DotSeperated, out var index); - return new MetadataResult(m_primaryKeys); } diff --git a/tests/FlowtideDotNet.Connector.ElasticSearch.Tests/ElasticSearchFixture.cs b/tests/FlowtideDotNet.Connector.ElasticSearch.Tests/ElasticSearchFixture.cs index 955287db8..535082823 100644 --- a/tests/FlowtideDotNet.Connector.ElasticSearch.Tests/ElasticSearchFixture.cs +++ b/tests/FlowtideDotNet.Connector.ElasticSearch.Tests/ElasticSearchFixture.cs @@ -29,7 +29,11 @@ public class ElasticSearchFixture : IAsyncLifetime private IContainer? container; public async Task DisposeAsync() { - await container.DisposeAsync(); + if (container != null) + { + await container.DisposeAsync(); + } + } private sealed class WaitUntil : IWaitUntil diff --git a/tests/FlowtideDotNet.Connector.ElasticSearch.Tests/ElasticsearchTestStream.cs b/tests/FlowtideDotNet.Connector.ElasticSearch.Tests/ElasticsearchTestStream.cs index e81620e9a..4db55785d 100644 --- a/tests/FlowtideDotNet.Connector.ElasticSearch.Tests/ElasticsearchTestStream.cs +++ b/tests/FlowtideDotNet.Connector.ElasticSearch.Tests/ElasticsearchTestStream.cs @@ -13,6 +13,7 @@ using FlowtideDotNet.AcceptanceTests.Internal; using FlowtideDotNet.Connector.CosmosDB.Tests; using FlowtideDotNet.Core.Engine; +using Nest; using System; using System.Collections.Generic; using System.Linq; @@ -24,15 +25,17 @@ namespace FlowtideDotNet.Connector.ElasticSearch.Tests internal class ElasticsearchTestStream : FlowtideTestStream { private readonly ElasticSearchFixture elasticSearchFixture; + private readonly Action? customMapping; - public ElasticsearchTestStream(ElasticSearchFixture elasticSearchFixture, string testName) : base(testName) + public ElasticsearchTestStream(ElasticSearchFixture elasticSearchFixture, string testName, Action? customMapping = null) : base(testName) { this.elasticSearchFixture = elasticSearchFixture; + this.customMapping = customMapping; } protected override void AddWriteResolvers(ReadWriteFactory factory) { - factory.AddElasticsearchSink("*", elasticSearchFixture.GetConnectionSettings()); + factory.AddElasticsearchSink("*", elasticSearchFixture.GetConnectionSettings(), customMappings: customMapping); } } } diff --git a/tests/FlowtideDotNet.Connector.ElasticSearch.Tests/SinkTests.cs b/tests/FlowtideDotNet.Connector.ElasticSearch.Tests/SinkTests.cs index 8e6177c45..1286ccdc8 100644 --- a/tests/FlowtideDotNet.Connector.ElasticSearch.Tests/SinkTests.cs +++ b/tests/FlowtideDotNet.Connector.ElasticSearch.Tests/SinkTests.cs @@ -39,5 +39,122 @@ FROM users } while (!success); } + + [Fact] + public async Task TestInsertWithCustomMappingIndexDoesNotExist() + { + ElasticsearchTestStream stream = new ElasticsearchTestStream(elasticSearchFixture, "TestInsert", (properties) => + { + properties["FirstName"] = new KeywordProperty(); + }); + stream.Generate(); + await stream.StartStream(@" + INSERT INTO testindex + SELECT + UserKey as _id, + FirstName, + LastName, + UserKey as pk + FROM users + "); + + ElasticClient elasticClient = new ElasticClient(elasticSearchFixture.GetConnectionSettings()); + + bool success = false; + do + { + var resp = await elasticClient.LowLevel.GetAsync("testindex", "5"); + success = resp.ApiCall.HttpStatusCode == 200; + await Task.Delay(10); + } while (!success); + + } + + [Fact] + public async Task TestInsertWithCustomMappingIndexExistsWithNoMappings() + { + ElasticClient elasticClient = new ElasticClient(elasticSearchFixture.GetConnectionSettings()); + elasticClient.Indices.Create("testindex"); + ElasticsearchTestStream stream = new ElasticsearchTestStream(elasticSearchFixture, "TestInsert", (properties) => + { + properties["FirstName"] = new KeywordProperty(); + }); + stream.Generate(); + await stream.StartStream(@" + INSERT INTO testindex + SELECT + UserKey as _id, + FirstName, + LastName, + UserKey as pk + FROM users + "); + + bool success = false; + do + { + var resp = await elasticClient.LowLevel.GetAsync("testindex", "5"); + success = resp.ApiCall.HttpStatusCode == 200; + await Task.Delay(10); + } while (!success); + } + + [Fact] + public async Task TestInsertWithCustomMappingIndexExistsWithMappings() + { + ElasticClient elasticClient = new ElasticClient(elasticSearchFixture.GetConnectionSettings()); + elasticClient.Indices.Create("testindex", c => c.Map(m => m.Properties(p => p.Keyword(k => k.Name("FirstName"))))); + ElasticsearchTestStream stream = new ElasticsearchTestStream(elasticSearchFixture, "TestInsert", (properties) => + { + properties["FirstName"] = new KeywordProperty(); + }); + stream.Generate(); + await stream.StartStream(@" + INSERT INTO testindex + SELECT + UserKey as _id, + FirstName, + LastName, + UserKey as pk + FROM users + "); + + bool success = false; + do + { + var resp = await elasticClient.LowLevel.GetAsync("testindex", "5"); + success = resp.ApiCall.HttpStatusCode == 200; + await Task.Delay(10); + } while (!success); + + } + + [Fact] + public async Task TestInsertWithCustomMappingIndexExistsWithMappingsCollision() + { + ElasticClient elasticClient = new ElasticClient(elasticSearchFixture.GetConnectionSettings()); + elasticClient.Indices.Delete("testindex"); + elasticClient.Indices.Create("testindex", c => c.Map(m => m.Properties(p => p.Text(k => k.Name("FirstName"))))); + ElasticsearchTestStream stream = new ElasticsearchTestStream(elasticSearchFixture, "TestInsert", (properties) => + { + properties["FirstName"] = new KeywordProperty(); + }); + stream.Generate(); + + var ex = await Assert.ThrowsAsync(async () => + { + await stream.StartStream(@" + INSERT INTO testindex + SELECT + UserKey as _id, + FirstName, + LastName, + UserKey as pk + FROM users + "); + }); + + Assert.Equal("mapper [FirstName] cannot be changed from type [text] to [keyword]", ex.Message); + } } } \ No newline at end of file