From dbab878a15941026dd48c898d28a21e480e04960 Mon Sep 17 00:00:00 2001 From: Ulimo Date: Sat, 6 Jan 2024 15:10:39 +0100 Subject: [PATCH] Generic read source (#266) * Add generic read source * Add test * Fix bug * Fix code smells * Fix duplicate code * Add docs * Remove full load interval in test * Check if json document is null --- docs/docs/connectors/customdata.md | 100 +++++ .../FlowtideKafkaJsonKeyDeserializer.cs | 8 +- .../FlowtideKafkaUpsertJsonDeserializer.cs | 61 +-- .../Flexbuffer/JsonSerializerUtils.cs | 88 ++++ .../Read/BatchableReadBaseOperator.cs | 389 ++++++++++++++++++ .../Sources/Generic/FlowtideGenericObject.cs | 51 +++ .../Sources/Generic/GenericDataSource.cs | 62 +++ .../GenericReadWriteFactoryExtensions.cs | 44 ++ .../Generic/Internal/GenericReadOperator.cs | 61 +++ .../Generic/Internal/ObjectToRowEvent.cs | 60 +++ .../Internal/FlowtideTestStream.cs | 5 + .../FlowtideDotNet.Core.Tests.csproj | 3 +- .../GenericReadOperatorTests.cs | 146 +++++++ 13 files changed, 1017 insertions(+), 61 deletions(-) create mode 100644 docs/docs/connectors/customdata.md create mode 100644 src/FlowtideDotNet.Core/Flexbuffer/JsonSerializerUtils.cs create mode 100644 src/FlowtideDotNet.Core/Operators/Read/BatchableReadBaseOperator.cs create mode 100644 src/FlowtideDotNet.Core/Sources/Generic/FlowtideGenericObject.cs create mode 100644 src/FlowtideDotNet.Core/Sources/Generic/GenericDataSource.cs create mode 100644 src/FlowtideDotNet.Core/Sources/Generic/GenericReadWriteFactoryExtensions.cs create mode 100644 src/FlowtideDotNet.Core/Sources/Generic/Internal/GenericReadOperator.cs create mode 100644 src/FlowtideDotNet.Core/Sources/Generic/Internal/ObjectToRowEvent.cs create mode 100644 tests/FlowtideDotNet.Core.Tests/GenericDataTests/GenericReadOperatorTests.cs diff --git a/docs/docs/connectors/customdata.md b/docs/docs/connectors/customdata.md new file mode 100644 index 000000000..d8506a02b --- /dev/null +++ b/docs/docs/connectors/customdata.md @@ -0,0 +1,100 @@ +--- +sidebar_position: 6 +--- + +# Custom Data Source + +It is possible to fetch data from a custom source such as an API, other database, file system, etc. + +There are multiple ways: + +* **Implement GenericDataSource(Async)** - Simplified source which should return C# objects. This is the recomended way to start implementing a source. +* **Implement ReadBaseOperator** - This allows the creation of a low-level read operator, where serialization, state storage, watermarks and checkpointing must be handled. + +## Generic Data Source + +The generic data source allows easy implementation against custom sources that returns C# objects. + +It allows: + +* Full batch reloads, where all the data is imported again and delta is computed. +* Delta loads, where delta should be returned. +* Custom watermark provided by the source. +* Scheduling of full batch and delta reloads. + +There are two classes that can be implemented for the generic data source: + +* **GenericDataSourceAsync** - Data is returned by an IAsyncEnumerable, this should be used with remote sources. +* **GenericDataSource** - Data is returned by an IEnumerable, which should be used in cases where data is already in memory. + +When implementing a generic data source, it is important to think about memory usage, for instance, do not +fetch all rows and store them in memory and then return them, this can cause huge memory spikes or out of memory. +Instead yield return values and fetch the data in batches. The operator stores the data in B+ trees that will be +temporarily stored on disk if the memory usage is too high. + +### Implementation example + +```csharp +public class ExampleDataSource : GenericDataSourceAsync +{ + private readonly IUserRepository _userRepository; + + public ExampleDataSource(IUserRepository userRepository) + { + _userRepository = userRepository; + } + + // Fetch delta every 1 second + public override TimeSpan? DeltaLoadInterval => TimeSpan.FromSeconds(1); + + // Reload all data every 1 hours, this is not required, but can be useful. + // If for instance deletes cant be found in deltas from the source, + // a full reload would find all deleted rows. + public override TimeSpan? FullLoadInterval => TimeSpan.FromHours(1); + + protected override IEnumerable> DeltaLoadAsync(long lastWatermark) + { + var changes = _userRepository.GetChangesFromWatermarkAsync(lastWatermark); + + await foreach(var change in changes) { + yield return new FlowtideGenericObject(change.Id, change, change.Timestamp); + } + } + + protected override IEnumerable> DeltaLoadAsync() + { + var data = _userRepository.GetAllDataAsync(lastWatermark); + + await foreach(var row in data) { + yield return new FlowtideGenericObject(row.Id, row, row.Timestamp); + } + } +} +``` + +To use your data source, add the following to the *ReadWriteFactory*: + +```csharp +factory.AddGenericDataSource( + "{regex for the table name}", + (readRelation) => new ExampleDataSource(userRepository)); +``` + +### Trigger data reloads programatically + +The generic data source also registers triggers that allows the user to notify the stream when a reload should happen. + +The following triggers are registered: + +* **full_load** - Does a full load on all running generic data sources +* **delta_load** - Does a delta load on all running generic data sources +* **full_load_\{tableName\}** - Full load for a specific source +* **delta_load_\{tableName\}** - Delta load for a specific source + +Example on calling a trigger: + +```csharp +await stream.CallTrigger("delta_load", default); +``` + +Calling the triggers programatically can be useful if having an interval would cause too much latency for the data. \ No newline at end of file diff --git a/src/FlowtideDotNet.Connector.Kafka/FlowtideKafkaJsonKeyDeserializer.cs b/src/FlowtideDotNet.Connector.Kafka/FlowtideKafkaJsonKeyDeserializer.cs index 80c3a78ae..f213dc88c 100644 --- a/src/FlowtideDotNet.Connector.Kafka/FlowtideKafkaJsonKeyDeserializer.cs +++ b/src/FlowtideDotNet.Connector.Kafka/FlowtideKafkaJsonKeyDeserializer.cs @@ -11,6 +11,7 @@ // limitations under the License. using FlexBuffers; +using FlowtideDotNet.Core.Flexbuffer; using System.Text.Json; namespace FlowtideDotNet.Connector.Kafka @@ -22,7 +23,12 @@ public FlxValue Deserialize(byte[] bytes) var jsonDocument = JsonSerializer.Deserialize(bytes); - return FlowtideKafkaUpsertJsonDeserializer.JsonElementToValue(jsonDocument.RootElement); + if (jsonDocument == null) + { + return FlxValue.FromBytes(FlexBuffer.Null()); + } + + return JsonSerializerUtils.JsonElementToValue(jsonDocument.RootElement); } } } diff --git a/src/FlowtideDotNet.Connector.Kafka/FlowtideKafkaUpsertJsonDeserializer.cs b/src/FlowtideDotNet.Connector.Kafka/FlowtideKafkaUpsertJsonDeserializer.cs index 2534cdaf5..2314fc2e5 100644 --- a/src/FlowtideDotNet.Connector.Kafka/FlowtideKafkaUpsertJsonDeserializer.cs +++ b/src/FlowtideDotNet.Connector.Kafka/FlowtideKafkaUpsertJsonDeserializer.cs @@ -12,6 +12,7 @@ using FlexBuffers; using FlowtideDotNet.Core; +using FlowtideDotNet.Core.Flexbuffer; using FlowtideDotNet.Substrait.Relations; using System.Text.Json; @@ -53,7 +54,7 @@ public RowEvent Deserialize(IFlowtideKafkaKeyDeserializer keyDeserializer, byte[ } else if (jsonDocument.TryGetProperty(_names[i], out var property)) { - b.Add(JsonElementToValue(property)); + b.Add(JsonSerializerUtils.JsonElementToValue(property)); } else { @@ -64,64 +65,6 @@ public RowEvent Deserialize(IFlowtideKafkaKeyDeserializer keyDeserializer, byte[ } } - internal static FlxValue JsonElementToValue(JsonElement jsonElement) - { - if (jsonElement.ValueKind == JsonValueKind.Null) - { - return FlxValue.FromBytes(FlexBuffer.Null()); - } - if (jsonElement.ValueKind == JsonValueKind.True) - { - return FlxValue.FromBytes(FlexBuffer.SingleValue(true)); - } - if (jsonElement.ValueKind == JsonValueKind.False) - { - return FlxValue.FromBytes(FlexBuffer.SingleValue(false)); - } - if (jsonElement.ValueKind == JsonValueKind.Number) - { - if (jsonElement.TryGetInt64(out var value)) - { - return FlxValue.FromBytes(FlexBuffer.SingleValue(value)); - } - else if (jsonElement.TryGetDouble(out var doubleValue)) - { - return FlxValue.FromBytes(FlexBuffer.SingleValue(doubleValue)); - } - else - { - throw new NotImplementedException(); - } - } - if (jsonElement.ValueKind == JsonValueKind.String) - { - return FlxValue.FromBytes(FlexBuffer.SingleValue(jsonElement.GetString())); - } - if (jsonElement.ValueKind == JsonValueKind.Array) - { - var bytes = FlexBufferBuilder.Vector(v => - { - foreach (var item in jsonElement.EnumerateArray()) - { - v.Add(JsonElementToValue(item)); - } - }); - return FlxValue.FromBytes(bytes); - } - if (jsonElement.ValueKind == JsonValueKind.Object) - { - var bytes = FlexBufferBuilder.Map(m => - { - foreach (var item in jsonElement.EnumerateObject()) - { - m.Add(item.Name, JsonElementToValue(item.Value)); - } - }); - return FlxValue.FromBytes(bytes); - } - throw new NotImplementedException(); - } - public Task Initialize(ReadRelation readRelation) { _names = readRelation.BaseSchema.Names; diff --git a/src/FlowtideDotNet.Core/Flexbuffer/JsonSerializerUtils.cs b/src/FlowtideDotNet.Core/Flexbuffer/JsonSerializerUtils.cs new file mode 100644 index 000000000..723ccb8ea --- /dev/null +++ b/src/FlowtideDotNet.Core/Flexbuffer/JsonSerializerUtils.cs @@ -0,0 +1,88 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using FlexBuffers; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; + +namespace FlowtideDotNet.Core.Flexbuffer +{ + public static class JsonSerializerUtils + { + public static FlxValue JsonElementToValue(JsonElement jsonElement) + { + if (jsonElement.ValueKind == JsonValueKind.Null) + { + return FlxValue.FromBytes(FlexBuffer.Null()); + } + if (jsonElement.ValueKind == JsonValueKind.True) + { + return FlxValue.FromBytes(FlexBuffer.SingleValue(true)); + } + if (jsonElement.ValueKind == JsonValueKind.False) + { + return FlxValue.FromBytes(FlexBuffer.SingleValue(false)); + } + if (jsonElement.ValueKind == JsonValueKind.Number) + { + if (jsonElement.TryGetInt64(out var value)) + { + return FlxValue.FromBytes(FlexBuffer.SingleValue(value)); + } + else if (jsonElement.TryGetDouble(out var doubleValue)) + { + return FlxValue.FromBytes(FlexBuffer.SingleValue(doubleValue)); + } + else + { + throw new NotImplementedException(); + } + } + if (jsonElement.ValueKind == JsonValueKind.String) + { + var str = jsonElement.GetString(); + if (str == null) + { + return FlxValue.FromBytes(FlexBuffer.Null()); + } + return FlxValue.FromBytes(FlexBuffer.SingleValue(str)); + } + if (jsonElement.ValueKind == JsonValueKind.Array) + { + var bytes = FlexBufferBuilder.Vector(v => + { + foreach (var item in jsonElement.EnumerateArray()) + { + v.Add(JsonElementToValue(item)); + } + }); + return FlxValue.FromBytes(bytes); + } + if (jsonElement.ValueKind == JsonValueKind.Object) + { + var bytes = FlexBufferBuilder.Map(m => + { + foreach (var item in jsonElement.EnumerateObject()) + { + m.Add(item.Name, JsonElementToValue(item.Value)); + } + }); + return FlxValue.FromBytes(bytes); + } + throw new NotImplementedException(); + } + } +} diff --git a/src/FlowtideDotNet.Core/Operators/Read/BatchableReadBaseOperator.cs b/src/FlowtideDotNet.Core/Operators/Read/BatchableReadBaseOperator.cs new file mode 100644 index 000000000..dd6497a58 --- /dev/null +++ b/src/FlowtideDotNet.Core/Operators/Read/BatchableReadBaseOperator.cs @@ -0,0 +1,389 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using FlowtideDotNet.Base.Vertices.Ingress; +using FlowtideDotNet.Core.Storage; +using FlowtideDotNet.Storage.Serializers; +using FlowtideDotNet.Storage.StateManager; +using FlowtideDotNet.Storage.Tree; +using FlowtideDotNet.Substrait.Relations; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Security.Cryptography; +using System.Text; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; + +namespace FlowtideDotNet.Core.Operators.Read +{ + public class BatchableReadOperatorState + { + public long LastWatermark { get; set; } + } + + public record struct BatchableReadEvent(string Key, RowEvent RowEvent, long Watermark); + + public abstract class BatchableReadBaseOperator : ReadBaseOperator + { + /// + /// Temporary tree used to store the full load data + /// + private IBPlusTree? _fullLoadTempTree; + + /// + /// Persistent tree used to store the data + /// + private IBPlusTree? _persistentTree; + + /// + /// Tree used to store the deletions for the data in full load + /// + private IBPlusTree? _deletionsTree; + private BatchableReadOperatorState? _state; + private readonly string _watermarkName; + protected BatchableReadBaseOperator(ReadRelation readRelation, DataflowBlockOptions options) : base(options) + { + _watermarkName = readRelation.NamedTable.DotSeperated; + } + + public override string DisplayName => "Generic"; + + public override Task DeleteAsync() + { + return Task.CompletedTask; + } + + public override Task OnTrigger(string triggerName, object? state) + { + switch (triggerName) + { + case "full_load": + return RunTask(FullLoadTrigger); + case "delta_load": + return RunTask(DeltaLoadTrigger); + default: + if (triggerName.Equals($"delta_load_{_watermarkName}")) + { + return RunTask(DeltaLoadTrigger); + } + else if (triggerName.Equals($"full_load_{_watermarkName}")) + { + return RunTask(FullLoadTrigger); + } + break; + } + return Task.CompletedTask; + } + + private async Task FullLoadTrigger(IngressOutput output, object? state) + { + await DoFullLoad(output); + } + + private async Task DeltaLoadTrigger(IngressOutput output, object? state) + { + await DoDeltaLoad(output); + } + + protected override async Task InitializeOrRestore(long restoreTime, BatchableReadOperatorState? state, IStateManagerClient stateManagerClient) + { + if (state != null) + { + _state = state; + } + else + { + _state = new BatchableReadOperatorState() + { + LastWatermark = -1 + }; + } + + _fullLoadTempTree = await stateManagerClient.GetOrCreateTree("full_load_temp", new BPlusTreeOptions() + { + Comparer = StringComparer.Ordinal, + KeySerializer = new StringSerializer(), + ValueSerializer = new StreamEventBPlusTreeSerializer() + }); + await _fullLoadTempTree.Clear(); + + _persistentTree = await stateManagerClient.GetOrCreateTree("persistent", new BPlusTreeOptions() + { + Comparer = StringComparer.Ordinal, + KeySerializer = new StringSerializer(), + ValueSerializer = new StreamEventBPlusTreeSerializer() + }); + + _deletionsTree = await stateManagerClient.GetOrCreateTree("deletions", new BPlusTreeOptions() + { + Comparer = StringComparer.Ordinal, + KeySerializer = new StringSerializer(), + ValueSerializer = new IntSerializer() + }); + await _deletionsTree.Clear(); + } + + private static async IAsyncEnumerable> IteratePerRow(IBPlusTreeIterator iterator) + { + await foreach (var page in iterator) + { + foreach (var kv in page) + { + yield return kv; + } + } + } + + private async Task DoDeltaLoad(IngressOutput output) + { + Debug.Assert(_persistentTree != null, nameof(_persistentTree)); + Debug.Assert(_state != null, nameof(_state)); + await output.EnterCheckpointLock(); + long maxWatermark = _state.LastWatermark; + List outputList = new List(); + bool sentUpdates = false; + await foreach (var e in DeltaLoad(_state.LastWatermark)) + { + var key = e.Key; + if (e.RowEvent.Weight < 0) + { + await _persistentTree.RMW(key, e.RowEvent, (input, current, exists) => + { + if (exists) + { + outputList.Add(new RowEvent(-1, 0, current.RowData)); + return (current, GenericWriteOperation.Delete); + } + return (input, GenericWriteOperation.None); + }); + } + else + { + await _persistentTree.RMW(key, e.RowEvent, (input, current, exists) => + { + if (exists) + { + outputList.Add(new RowEvent(-1, 0, current.RowData)); + outputList.Add(new RowEvent(1, 0, input.RowData)); + return (input, GenericWriteOperation.Upsert); + } + outputList.Add(new RowEvent(1, 0, input.RowData)); + return (input, GenericWriteOperation.Upsert); + }); + } + + maxWatermark = Math.Max(maxWatermark, e.Watermark); + + if (outputList.Count > 100) + { + await output.SendAsync(new StreamEventBatch(outputList)); + outputList = new List(); + sentUpdates = true; + } + } + if (outputList.Count > 0) + { + await output.SendAsync(new StreamEventBatch(outputList)); + outputList = new List(); + sentUpdates = true; + } + if (sentUpdates) + { + await output.SendWatermark(new Base.Watermark(_watermarkName, maxWatermark)); + } + output.ExitCheckpointLock(); + ScheduleCheckpoint(TimeSpan.FromMilliseconds(1)); + } + + private async Task DoFullLoad(IngressOutput output) + { + Debug.Assert(_fullLoadTempTree != null, nameof(_fullLoadTempTree)); + Debug.Assert(_persistentTree != null, nameof(_persistentTree)); + Debug.Assert(_deletionsTree != null, nameof(_deletionsTree)); + Debug.Assert(_state != null, nameof(_state)); + + // Lock checkpointing until the full load is complete + await output.EnterCheckpointLock(); + + long maxWatermark = 0; + List outputList = new List(); + + await foreach (var e in FullLoad()) + { + if (e.RowEvent.Weight < 0) + { + throw new NotSupportedException("Full load does not support deletions"); + } + var key = e.Key; + await _fullLoadTempTree.Upsert(key, e.RowEvent); + await _persistentTree.RMW(key, e.RowEvent, (input, current, exist) => + { + if (exist) + { + if (RowEvent.Compare(input, current) != 0) + { + outputList.Add(new RowEvent(1, 0, input.RowData)); + outputList.Add(new RowEvent(-1, 0, current.RowData)); + return (input, GenericWriteOperation.Upsert); + } + else + { + return (current, GenericWriteOperation.None); + } + } + outputList.Add(new RowEvent(1, 0, input.RowData)); + return (input, GenericWriteOperation.Upsert); + }); + + maxWatermark = Math.Max(maxWatermark, e.Watermark); + + if (outputList.Count > 100) + { + await output.SendAsync(new StreamEventBatch(outputList)); + outputList = new List(); + } + } + + if (outputList.Count > 0) + { + await output.SendAsync(new StreamEventBatch(outputList)); + outputList = new List(); + } + + var tmpIterator = _fullLoadTempTree.CreateIterator(); + var persistentIterator = _persistentTree.CreateIterator(); + await tmpIterator.SeekFirst(); + await persistentIterator.SeekFirst(); + + var tmpEnumerator = IteratePerRow(tmpIterator).GetAsyncEnumerator(); + var persistentEnumerator = IteratePerRow(persistentIterator).GetAsyncEnumerator(); + + var hasNew = await tmpEnumerator.MoveNextAsync(); + var hasOld = await persistentEnumerator.MoveNextAsync(); + + // Go through both trees and find deletions + while (hasNew || hasOld) + { + int comparison = hasNew && hasOld ? tmpEnumerator.Current.Key.CompareTo(persistentEnumerator.Current.Key) : 0; + + // If there is no more old data, then we are done + if (!hasOld) + { + break; + } + if (hasNew && comparison < 0) + { + hasNew = await tmpEnumerator.MoveNextAsync(); + } + if (!hasNew || comparison > 0) + { + await _deletionsTree.Upsert(persistentEnumerator.Current.Key, 1); + // Deletion + outputList.Add(new RowEvent(-1, 0, persistentEnumerator.Current.Value.RowData)); + hasOld = await persistentEnumerator.MoveNextAsync(); + } + else + { + hasNew = await tmpEnumerator.MoveNextAsync(); + hasOld = await persistentEnumerator.MoveNextAsync(); + } + } + + // Clear the temp tree + await _fullLoadTempTree.Clear(); + + var deleteIterator = _deletionsTree.CreateIterator(); + await deleteIterator.SeekFirst(); + + await foreach (var page in deleteIterator) + { + foreach (var kv in page) + { + // Go through the deletions and delete them from the persistent tree + await _persistentTree.RMW(kv.Key, default, (input, current, exists) => + { + if (exists) + { + // Output delete event + outputList.Add(new RowEvent(-1, 0, current.RowData)); + return (current, GenericWriteOperation.Delete); + } + return (current, GenericWriteOperation.None); + }); + + if (outputList.Count > 100) + { + await output.SendAsync(new StreamEventBatch(outputList)); + outputList = new List(); + } + } + } + // Clear the deletions tree + await _deletionsTree.Clear(); + + if (outputList.Count > 0) + { + await output.SendAsync(new StreamEventBatch(outputList)); + outputList = new List(); + } + // Send the new max watermark + _state.LastWatermark = maxWatermark; + await output.SendWatermark(new Base.Watermark(_watermarkName, maxWatermark)); + + output.ExitCheckpointLock(); + ScheduleCheckpoint(TimeSpan.FromMilliseconds(1)); + } + + protected abstract IAsyncEnumerable FullLoad(); + + protected abstract IAsyncEnumerable DeltaLoad(long lastWatermark); + + protected virtual TimeSpan? GetFullLoadSchedule() + { + return default; + } + + protected abstract TimeSpan? GetDeltaLoadTimeSpan(); + + protected override async Task OnCheckpoint(long checkpointTime) + { + Debug.Assert(_state != null, nameof(_state)); + Debug.Assert(_persistentTree != null, nameof(_persistentTree)); + + await _persistentTree.Commit(); + return _state; + } + + protected override async Task SendInitial(IngressOutput output) + { + Debug.Assert(_state != null, nameof(_state)); + if (_state.LastWatermark < 0) + { + // Only do full load if we have not done it before + await DoFullLoad(output); + } + // Register full load trigger if the user wants to call it or schedule it + await RegisterTrigger("full_load", GetFullLoadSchedule()); + // Register delta load trigger if the user wants to call it or schedule it + await RegisterTrigger("delta_load", GetDeltaLoadTimeSpan()); + await RegisterTrigger($"delta_load_{_watermarkName}"); + await RegisterTrigger($"full_load_{_watermarkName}"); + } + + protected override Task> GetWatermarkNames() + { + return Task.FromResult>(new HashSet() { _watermarkName }); + } + } +} diff --git a/src/FlowtideDotNet.Core/Sources/Generic/FlowtideGenericObject.cs b/src/FlowtideDotNet.Core/Sources/Generic/FlowtideGenericObject.cs new file mode 100644 index 000000000..02aabb87e --- /dev/null +++ b/src/FlowtideDotNet.Core/Sources/Generic/FlowtideGenericObject.cs @@ -0,0 +1,51 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace FlowtideDotNet.Core.Sources.Generic +{ + public class FlowtideGenericObject + { + public FlowtideGenericObject(string key, T? value, long watermark, bool isDelete) + { + Key = key; + Value = value; + Watermark = watermark; + this.isDelete = isDelete; + } + + /// + /// Unique identifier of the object + /// + public string Key { get; } + + /// + /// Value object, contains the data that will be used by the stream, can be null if its a delete + /// + public T? Value { get; } + + /// + /// Watermark of the object, used to determine the order of the objects in the stream + /// + public long Watermark { get; } + + /// + /// Indicates if the object is deleted or not + /// + public bool isDelete { get; } + } +} diff --git a/src/FlowtideDotNet.Core/Sources/Generic/GenericDataSource.cs b/src/FlowtideDotNet.Core/Sources/Generic/GenericDataSource.cs new file mode 100644 index 000000000..64157445e --- /dev/null +++ b/src/FlowtideDotNet.Core/Sources/Generic/GenericDataSource.cs @@ -0,0 +1,62 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace FlowtideDotNet.Core.Sources.Generic +{ + public abstract class GenericDataSourceAsync where T : class + { + /// + /// Fetches all objects from the data source + /// + /// + public abstract IAsyncEnumerable> FullLoadAsync(); + + /// + /// Fetches all objects from the data source that have changed since the last delta load + /// + /// + public abstract IAsyncEnumerable> DeltaLoadAsync(long lastWatermark); + + /// + /// Interval between delta loads, set to null to disable delta loads + /// + public abstract TimeSpan? DeltaLoadInterval { get; } + + /// + /// Interval between full loads, set to null to disable, defaults to null + /// + public virtual TimeSpan? FullLoadInterval => default; + } + + public abstract class GenericDataSource : GenericDataSourceAsync where T : class + { + public override IAsyncEnumerable> FullLoadAsync() + { + return FullLoad().ToAsyncEnumerable(); + } + + public override IAsyncEnumerable> DeltaLoadAsync(long lastWatermark) + { + return DeltaLoad(lastWatermark).ToAsyncEnumerable(); + } + + protected abstract IEnumerable> FullLoad(); + + protected abstract IEnumerable> DeltaLoad(long lastWatermark); + } +} diff --git a/src/FlowtideDotNet.Core/Sources/Generic/GenericReadWriteFactoryExtensions.cs b/src/FlowtideDotNet.Core/Sources/Generic/GenericReadWriteFactoryExtensions.cs new file mode 100644 index 000000000..6549dc18b --- /dev/null +++ b/src/FlowtideDotNet.Core/Sources/Generic/GenericReadWriteFactoryExtensions.cs @@ -0,0 +1,44 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using FlowtideDotNet.Core.Sources.Generic; +using FlowtideDotNet.Core.Sources.Generic.Internal; +using FlowtideDotNet.Substrait.Relations; +using System.Text.RegularExpressions; + +namespace FlowtideDotNet.Core.Engine +{ + public static class GenericReadWriteFactoryExtensions + { + public static ReadWriteFactory AddGenericDataSource(this ReadWriteFactory readWriteFactory, string regexPattern, Func> dataSource) + where T: class + { + if (regexPattern == "*") + { + regexPattern = ".*"; + } + + readWriteFactory.AddReadResolver((readRelation, opt) => + { + var regexResult = Regex.Match(readRelation.NamedTable.DotSeperated, regexPattern, RegexOptions.IgnoreCase, TimeSpan.FromSeconds(5)); + if (!regexResult.Success) + { + return null; + } + + return new ReadOperatorInfo(new GenericReadOperator(readRelation, dataSource(readRelation), opt)); + }); + + return readWriteFactory; + } + } +} diff --git a/src/FlowtideDotNet.Core/Sources/Generic/Internal/GenericReadOperator.cs b/src/FlowtideDotNet.Core/Sources/Generic/Internal/GenericReadOperator.cs new file mode 100644 index 000000000..909bca8e5 --- /dev/null +++ b/src/FlowtideDotNet.Core/Sources/Generic/Internal/GenericReadOperator.cs @@ -0,0 +1,61 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Text.Json; +using System.Threading.Tasks.Dataflow; +using FlowtideDotNet.Core.Operators.Read; +using FlowtideDotNet.Substrait.Relations; + +namespace FlowtideDotNet.Core.Sources.Generic.Internal +{ + internal class GenericReadOperator : BatchableReadBaseOperator + where T : class + { + private readonly GenericDataSourceAsync genericDataSource; + private readonly ObjectToRowEvent objectToRowEvent; + + public GenericReadOperator(ReadRelation readRelation, GenericDataSourceAsync genericDataSource, DataflowBlockOptions options) : base(readRelation, options) + { + this.genericDataSource = genericDataSource; + objectToRowEvent = new ObjectToRowEvent(readRelation); + } + + protected override async IAsyncEnumerable DeltaLoad(long lastWatermark) + { + await foreach (var ev in genericDataSource.DeltaLoadAsync(lastWatermark)) + { + var rowEvent = objectToRowEvent.Convert(ev.Value, ev.isDelete); + yield return new BatchableReadEvent(ev.Key, rowEvent, ev.Watermark); + } + } + + protected override async IAsyncEnumerable FullLoad() + { + // Read data, convert to json, create row event from the json and then send. + await foreach (var ev in genericDataSource.FullLoadAsync()) + { + var rowEvent = objectToRowEvent.Convert(ev.Value, ev.isDelete); + yield return new BatchableReadEvent(ev.Key, rowEvent, ev.Watermark); + } + } + + protected override TimeSpan? GetFullLoadSchedule() + { + return genericDataSource.FullLoadInterval; + } + + protected override TimeSpan? GetDeltaLoadTimeSpan() + { + return genericDataSource.DeltaLoadInterval; + } + } +} diff --git a/src/FlowtideDotNet.Core/Sources/Generic/Internal/ObjectToRowEvent.cs b/src/FlowtideDotNet.Core/Sources/Generic/Internal/ObjectToRowEvent.cs new file mode 100644 index 000000000..6808f1588 --- /dev/null +++ b/src/FlowtideDotNet.Core/Sources/Generic/Internal/ObjectToRowEvent.cs @@ -0,0 +1,60 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using FlexBuffers; +using FlowtideDotNet.Core.Flexbuffer; +using FlowtideDotNet.Substrait.Relations; +using System.Text.Json; + +namespace FlowtideDotNet.Core.Sources.Generic.Internal +{ + internal class ObjectToRowEvent + { + private readonly List _names; + private RowEvent _deleteEvent; + + public ObjectToRowEvent(ReadRelation readRelation) + { + _names = readRelation.BaseSchema.Names; + _deleteEvent = RowEvent.Create(-1, 0, b => + { + for (int i = 0; i < _names.Count; i++) + { + b.AddNull(); + } + }); + } + public RowEvent Convert(T obj, bool isDelete) + { + if (isDelete) + { + return _deleteEvent; + } + var document = JsonSerializer.SerializeToDocument(obj); + var root = document.RootElement; + return RowEvent.Create(1, 0, b => + { + for (int i = 0; i < _names.Count; i++) + { + if (root.TryGetProperty(_names[i], out var property)) + { + b.Add(JsonSerializerUtils.JsonElementToValue(property)); + } + else + { + b.AddNull(); + } + } + }); + } + } +} diff --git a/tests/FlowtideDotNet.AcceptanceTests/Internal/FlowtideTestStream.cs b/tests/FlowtideDotNet.AcceptanceTests/Internal/FlowtideTestStream.cs index aba1e9308..8a644aee1 100644 --- a/tests/FlowtideDotNet.AcceptanceTests/Internal/FlowtideTestStream.cs +++ b/tests/FlowtideDotNet.AcceptanceTests/Internal/FlowtideTestStream.cs @@ -288,5 +288,10 @@ public async ValueTask DisposeAsync() _fileCachePersistence.ForceDispose(); } } + + public async Task Trigger(string triggerName) + { + await _stream!.CallTrigger(triggerName, default); + } } } diff --git a/tests/FlowtideDotNet.Core.Tests/FlowtideDotNet.Core.Tests.csproj b/tests/FlowtideDotNet.Core.Tests/FlowtideDotNet.Core.Tests.csproj index 226f053bc..744dfd18d 100644 --- a/tests/FlowtideDotNet.Core.Tests/FlowtideDotNet.Core.Tests.csproj +++ b/tests/FlowtideDotNet.Core.Tests/FlowtideDotNet.Core.Tests.csproj @@ -1,4 +1,4 @@ - + net7.0 @@ -32,6 +32,7 @@ + diff --git a/tests/FlowtideDotNet.Core.Tests/GenericDataTests/GenericReadOperatorTests.cs b/tests/FlowtideDotNet.Core.Tests/GenericDataTests/GenericReadOperatorTests.cs new file mode 100644 index 000000000..3dc13aff2 --- /dev/null +++ b/tests/FlowtideDotNet.Core.Tests/GenericDataTests/GenericReadOperatorTests.cs @@ -0,0 +1,146 @@ +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using FlowtideDotNet.AcceptanceTests.Entities; +using FlowtideDotNet.AcceptanceTests.Internal; +using FlowtideDotNet.Core.Engine; +using FlowtideDotNet.Core.Sources.Generic; + +namespace FlowtideDotNet.Core.Tests.GenericDataTests +{ + internal class GenericDataTestStream : FlowtideTestStream + { + private readonly TestDataSource testDataSource; + + public GenericDataTestStream(TestDataSource testDataSource, string testName) : base(testName) + { + this.testDataSource = testDataSource; + } + + protected override void AddReadResolvers(ReadWriteFactory factory) + { + factory.AddGenericDataSource("*", (rel) => testDataSource); + } + } + + internal class TestDataSource : GenericDataSource + { + private readonly List> _changes = new List>(); + private readonly TimeSpan? deltaTime; + private int _index = 0; + + public TestDataSource(TimeSpan? deltaTime) + { + this.deltaTime = deltaTime; + } + + public void AddChange(FlowtideGenericObject change) + { + _changes.Add(change); + } + public override TimeSpan? DeltaLoadInterval => deltaTime; + + protected override IEnumerable> DeltaLoad(long lastWatermark) + { + for (; _index < _changes.Count; _index++) + { + if (_changes[_index].Watermark > lastWatermark) + { + yield return _changes[_index]; + } + } + } + + protected override IEnumerable> FullLoad() + { + _index = 0; + for (; _index < _changes.Count; _index++) + { + yield return _changes[_index]; + } + } + } + + public class GenericReadOperatorTests + { + [Fact] + public async Task TestGenericDataSource() + { + var source = new TestDataSource(TimeSpan.FromMilliseconds(1)); + source.AddChange(new FlowtideGenericObject("1", new User { UserKey = 1, FirstName = "Test" }, 1, false)); + + var stream = new GenericDataTestStream(source, "TestGenericDataSource"); + stream.Generate(); + + + await stream.StartStream(@" + INSERT INTO output + SELECT + UserKey, + FirstName + FROM users + "); + await stream.WaitForUpdate(); + + stream.AssertCurrentDataEqual(new List() { new User { UserKey = 1, FirstName = "Test" } }.Select(x => new {x.UserKey, x.FirstName})); + + // Update user 1 + source.AddChange(new FlowtideGenericObject("1", new User { UserKey = 1, FirstName = "Test2" }, 2, false)); + source.AddChange(new FlowtideGenericObject("2", new User { UserKey = 2, FirstName = "Test3" }, 3, false)); + await stream.WaitForUpdate(); + + stream.AssertCurrentDataEqual(new List() { new User { UserKey = 1, FirstName = "Test2" }, new User { UserKey = 2, FirstName = "Test3" } }.Select(x => new { x.UserKey, x.FirstName })); + + // Delete + source.AddChange(new FlowtideGenericObject("1", null, 4, true)); + await stream.WaitForUpdate(); + stream.AssertCurrentDataEqual(new List() { new User { UserKey = 2, FirstName = "Test3" } }.Select(x => new { x.UserKey, x.FirstName })); + + } + + [Fact] + public async Task TestDeltaTrigger() + { + var source = new TestDataSource(default); + source.AddChange(new FlowtideGenericObject("1", new User { UserKey = 1, FirstName = "Test" }, 1, false)); + + var stream = new GenericDataTestStream(source, "TestDeltaTrigger"); + stream.Generate(); + + await stream.StartStream(@" + INSERT INTO output + SELECT + UserKey, + FirstName + FROM users + "); + await stream.WaitForUpdate(); + + stream.AssertCurrentDataEqual(new List() { new User { UserKey = 1, FirstName = "Test" } }.Select(x => new { x.UserKey, x.FirstName })); + + // Update user 1 + source.AddChange(new FlowtideGenericObject("1", new User { UserKey = 1, FirstName = "Test2" }, 2, false)); + source.AddChange(new FlowtideGenericObject("2", new User { UserKey = 2, FirstName = "Test3" }, 3, false)); + + await stream.Trigger("delta_load"); + await stream.WaitForUpdate(); + + stream.AssertCurrentDataEqual(new List() { new User { UserKey = 1, FirstName = "Test2" }, new User { UserKey = 2, FirstName = "Test3" } }.Select(x => new { x.UserKey, x.FirstName })); + + source.AddChange(new FlowtideGenericObject("1", null, 4, true)); + await stream.Trigger("delta_load_users"); + await stream.WaitForUpdate(); + stream.AssertCurrentDataEqual(new List() { new User { UserKey = 2, FirstName = "Test3" } }.Select(x => new { x.UserKey, x.FirstName })); + + } + } +}