Skip to content

Commit

Permalink
Generic read source (#266)
Browse files Browse the repository at this point in the history
* Add generic read source

* Add test

* Fix bug

* Fix code smells

* Fix duplicate code

* Add docs

* Remove full load interval in test

* Check if json document is null
  • Loading branch information
Ulimo authored Jan 6, 2024
1 parent 1f48432 commit dbab878
Show file tree
Hide file tree
Showing 13 changed files with 1,017 additions and 61 deletions.
100 changes: 100 additions & 0 deletions docs/docs/connectors/customdata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
---
sidebar_position: 6
---

# Custom Data Source

It is possible to fetch data from a custom source such as an API, other database, file system, etc.

There are multiple ways:

* **Implement GenericDataSource(Async)** - Simplified source which should return C# objects. This is the recomended way to start implementing a source.
* **Implement ReadBaseOperator** - This allows the creation of a low-level read operator, where serialization, state storage, watermarks and checkpointing must be handled.

## Generic Data Source

The generic data source allows easy implementation against custom sources that returns C# objects.

It allows:

* Full batch reloads, where all the data is imported again and delta is computed.
* Delta loads, where delta should be returned.
* Custom watermark provided by the source.
* Scheduling of full batch and delta reloads.

There are two classes that can be implemented for the generic data source:

* **GenericDataSourceAsync** - Data is returned by an IAsyncEnumerable, this should be used with remote sources.
* **GenericDataSource** - Data is returned by an IEnumerable, which should be used in cases where data is already in memory.

When implementing a generic data source, it is important to think about memory usage, for instance, do not
fetch all rows and store them in memory and then return them, this can cause huge memory spikes or out of memory.
Instead yield return values and fetch the data in batches. The operator stores the data in B+ trees that will be
temporarily stored on disk if the memory usage is too high.

### Implementation example

```csharp
public class ExampleDataSource : GenericDataSourceAsync<User>
{
private readonly IUserRepository _userRepository;

public ExampleDataSource(IUserRepository userRepository)
{
_userRepository = userRepository;
}

// Fetch delta every 1 second
public override TimeSpan? DeltaLoadInterval => TimeSpan.FromSeconds(1);

// Reload all data every 1 hours, this is not required, but can be useful.
// If for instance deletes cant be found in deltas from the source,
// a full reload would find all deleted rows.
public override TimeSpan? FullLoadInterval => TimeSpan.FromHours(1);

protected override IEnumerable<FlowtideGenericObject<User>> DeltaLoadAsync(long lastWatermark)
{
var changes = _userRepository.GetChangesFromWatermarkAsync(lastWatermark);

await foreach(var change in changes) {
yield return new FlowtideGenericObject<User>(change.Id, change, change.Timestamp);
}
}

protected override IEnumerable<FlowtideGenericObject<User>> DeltaLoadAsync()
{
var data = _userRepository.GetAllDataAsync(lastWatermark);

await foreach(var row in data) {
yield return new FlowtideGenericObject<User>(row.Id, row, row.Timestamp);
}
}
}
```

To use your data source, add the following to the *ReadWriteFactory*:

```csharp
factory.AddGenericDataSource(
"{regex for the table name}",
(readRelation) => new ExampleDataSource(userRepository));
```

### Trigger data reloads programatically

The generic data source also registers triggers that allows the user to notify the stream when a reload should happen.

The following triggers are registered:

* **full_load** - Does a full load on all running generic data sources
* **delta_load** - Does a delta load on all running generic data sources
* **full_load_\{tableName\}** - Full load for a specific source
* **delta_load_\{tableName\}** - Delta load for a specific source

Example on calling a trigger:

```csharp
await stream.CallTrigger("delta_load", default);
```

Calling the triggers programatically can be useful if having an interval would cause too much latency for the data.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// limitations under the License.

using FlexBuffers;
using FlowtideDotNet.Core.Flexbuffer;
using System.Text.Json;

namespace FlowtideDotNet.Connector.Kafka
Expand All @@ -22,7 +23,12 @@ public FlxValue Deserialize(byte[] bytes)

var jsonDocument = JsonSerializer.Deserialize<JsonDocument>(bytes);

return FlowtideKafkaUpsertJsonDeserializer.JsonElementToValue(jsonDocument.RootElement);
if (jsonDocument == null)
{
return FlxValue.FromBytes(FlexBuffer.Null());
}

return JsonSerializerUtils.JsonElementToValue(jsonDocument.RootElement);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

using FlexBuffers;
using FlowtideDotNet.Core;
using FlowtideDotNet.Core.Flexbuffer;
using FlowtideDotNet.Substrait.Relations;
using System.Text.Json;

Expand Down Expand Up @@ -53,7 +54,7 @@ public RowEvent Deserialize(IFlowtideKafkaKeyDeserializer keyDeserializer, byte[
}
else if (jsonDocument.TryGetProperty(_names[i], out var property))
{
b.Add(JsonElementToValue(property));
b.Add(JsonSerializerUtils.JsonElementToValue(property));
}
else
{
Expand All @@ -64,64 +65,6 @@ public RowEvent Deserialize(IFlowtideKafkaKeyDeserializer keyDeserializer, byte[
}
}

internal static FlxValue JsonElementToValue(JsonElement jsonElement)
{
if (jsonElement.ValueKind == JsonValueKind.Null)
{
return FlxValue.FromBytes(FlexBuffer.Null());
}
if (jsonElement.ValueKind == JsonValueKind.True)
{
return FlxValue.FromBytes(FlexBuffer.SingleValue(true));
}
if (jsonElement.ValueKind == JsonValueKind.False)
{
return FlxValue.FromBytes(FlexBuffer.SingleValue(false));
}
if (jsonElement.ValueKind == JsonValueKind.Number)
{
if (jsonElement.TryGetInt64(out var value))
{
return FlxValue.FromBytes(FlexBuffer.SingleValue(value));
}
else if (jsonElement.TryGetDouble(out var doubleValue))
{
return FlxValue.FromBytes(FlexBuffer.SingleValue(doubleValue));
}
else
{
throw new NotImplementedException();
}
}
if (jsonElement.ValueKind == JsonValueKind.String)
{
return FlxValue.FromBytes(FlexBuffer.SingleValue(jsonElement.GetString()));
}
if (jsonElement.ValueKind == JsonValueKind.Array)
{
var bytes = FlexBufferBuilder.Vector(v =>
{
foreach (var item in jsonElement.EnumerateArray())
{
v.Add(JsonElementToValue(item));
}
});
return FlxValue.FromBytes(bytes);
}
if (jsonElement.ValueKind == JsonValueKind.Object)
{
var bytes = FlexBufferBuilder.Map(m =>
{
foreach (var item in jsonElement.EnumerateObject())
{
m.Add(item.Name, JsonElementToValue(item.Value));
}
});
return FlxValue.FromBytes(bytes);
}
throw new NotImplementedException();
}

public Task Initialize(ReadRelation readRelation)
{
_names = readRelation.BaseSchema.Names;
Expand Down
88 changes: 88 additions & 0 deletions src/FlowtideDotNet.Core/Flexbuffer/JsonSerializerUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using FlexBuffers;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;

namespace FlowtideDotNet.Core.Flexbuffer
{
public static class JsonSerializerUtils
{
public static FlxValue JsonElementToValue(JsonElement jsonElement)
{
if (jsonElement.ValueKind == JsonValueKind.Null)
{
return FlxValue.FromBytes(FlexBuffer.Null());
}
if (jsonElement.ValueKind == JsonValueKind.True)
{
return FlxValue.FromBytes(FlexBuffer.SingleValue(true));
}
if (jsonElement.ValueKind == JsonValueKind.False)
{
return FlxValue.FromBytes(FlexBuffer.SingleValue(false));
}
if (jsonElement.ValueKind == JsonValueKind.Number)
{
if (jsonElement.TryGetInt64(out var value))
{
return FlxValue.FromBytes(FlexBuffer.SingleValue(value));
}
else if (jsonElement.TryGetDouble(out var doubleValue))
{
return FlxValue.FromBytes(FlexBuffer.SingleValue(doubleValue));
}
else
{
throw new NotImplementedException();
}
}
if (jsonElement.ValueKind == JsonValueKind.String)
{
var str = jsonElement.GetString();
if (str == null)
{
return FlxValue.FromBytes(FlexBuffer.Null());
}
return FlxValue.FromBytes(FlexBuffer.SingleValue(str));
}
if (jsonElement.ValueKind == JsonValueKind.Array)
{
var bytes = FlexBufferBuilder.Vector(v =>
{
foreach (var item in jsonElement.EnumerateArray())
{
v.Add(JsonElementToValue(item));
}
});
return FlxValue.FromBytes(bytes);
}
if (jsonElement.ValueKind == JsonValueKind.Object)
{
var bytes = FlexBufferBuilder.Map(m =>
{
foreach (var item in jsonElement.EnumerateObject())
{
m.Add(item.Name, JsonElementToValue(item.Value));
}
});
return FlxValue.FromBytes(bytes);
}
throw new NotImplementedException();
}
}
}
Loading

0 comments on commit dbab878

Please sign in to comment.