Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ETag sample code and Blog #929

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Garnet.sln
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Microsoft Visual Studio Solution File, Format Version 12.00
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.0.31808.319
MinimumVisualStudioVersion = 10.0.40219.1
Expand Down Expand Up @@ -111,6 +111,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Garnet.resources", "libs\re
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NoOpModule", "playground\NoOpModule\NoOpModule.csproj", "{D4C9A1A0-7053-F072-21F5-4E0C5827136D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ETag", "samples\ETag\ETag.csproj", "{4FBA1587-BAFC-49F8-803A-D1CF431A26F5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -335,6 +337,14 @@ Global
{D4C9A1A0-7053-F072-21F5-4E0C5827136D}.Release|Any CPU.Build.0 = Release|Any CPU
{D4C9A1A0-7053-F072-21F5-4E0C5827136D}.Release|x64.ActiveCfg = Release|Any CPU
{D4C9A1A0-7053-F072-21F5-4E0C5827136D}.Release|x64.Build.0 = Release|Any CPU
{4FBA1587-BAFC-49F8-803A-D1CF431A26F5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4FBA1587-BAFC-49F8-803A-D1CF431A26F5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4FBA1587-BAFC-49F8-803A-D1CF431A26F5}.Debug|x64.ActiveCfg = Debug|Any CPU
{4FBA1587-BAFC-49F8-803A-D1CF431A26F5}.Debug|x64.Build.0 = Debug|Any CPU
{4FBA1587-BAFC-49F8-803A-D1CF431A26F5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4FBA1587-BAFC-49F8-803A-D1CF431A26F5}.Release|Any CPU.Build.0 = Release|Any CPU
{4FBA1587-BAFC-49F8-803A-D1CF431A26F5}.Release|x64.ActiveCfg = Release|Any CPU
{4FBA1587-BAFC-49F8-803A-D1CF431A26F5}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -370,6 +380,7 @@ Global
{DF2DD03E-87EE-482A-9FBA-6C8FBC23BDC5} = {697766CD-2046-46D9-958A-0FD3B46C98D4}
{A48412B4-FD60-467E-A5D9-F155CAB4F907} = {147FCE31-EC09-4C90-8E4D-37CA87ED18C3}
{D4C9A1A0-7053-F072-21F5-4E0C5827136D} = {69A71E2C-00E3-42F3-854E-BE157A24834E}
{4FBA1587-BAFC-49F8-803A-D1CF431A26F5} = {7068BB97-1958-4060-B5F1-859464592E56}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2C02C405-4798-41CA-AF98-61EDFEF6772E}
Expand Down
3 changes: 2 additions & 1 deletion libs/server/Storage/Functions/MainStore/RMWMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,10 @@ private bool InPlaceUpdaterWorker(ref SpanByte key, ref RawStringInput input, re
// Increment the ETag
long newEtag = functionsState.etagState.etag + 1;

rmwInfo.ClearExtraValueLength(ref recordInfo, ref value, value.TotalSize);
value.UnmarkExtraMetadata();
value.ShrinkSerializedLength(metadataSize + inputValue.Length + EtagConstants.EtagSize);
rmwInfo.SetUsedValueLength(ref recordInfo, ref value, value.TotalSize);
hamdaankhalid marked this conversation as resolved.
Show resolved Hide resolved
rmwInfo.ClearExtraValueLength(ref recordInfo, ref value, value.TotalSize);

value.SetEtagInPayload(newEtag);

Expand Down
192 changes: 192 additions & 0 deletions samples/ETag/Caching.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis;

namespace ETag;

public class Caching
{
/*
The whole idea of using ETag based commands for caching purposes is to reduce network utilization by only sending and recieving
what is needed over the network.

Scenario:
We are in an application, cache, and database setup.
In the read path the application always attempts to read from the cache and based on a hit or a miss it reaches into the database.
In the write path the application may use "write-through" or "write-back" to internally update the cache. The write path will be simulated in it's own thread.
The read path will be interacted with via a REPL.

Anytime the client stores a local "copy" of data that may exist on the cache, it will first make a call to the Cache and based on a hit or miss it will reach into the database.
Everything till now describes your commonly used caching use-case.

ETags help further speed up your Cache-Hit use case. When the client uses the Garnet GETIFNOTMATCH command they send their current ETag and only incur the extra network bandwidth
of recieving the entire payload when their local version is different from what is there on the server. With large payloads this can help reduce latency in your cache-hit route.
*/

public static async Task RunSimulation()
{
// LocalApplicationState represents the data that you keep within your Client's reach
Dictionary<int, (long, MovieReview)> localApplicationState = new Dictionary<int, (long, MovieReview)>();

Console.WriteLine("Seeding server and local state...");
await SeedCache(localApplicationState);

Console.WriteLine("Booting up fake server threads...");
// run fake server threads in the background that invalidates entries in the cache and changes things either in write-through or write-back manner
CancellationTokenSource cts = new CancellationTokenSource();
Task srvThread1 = Task.Run(() => FakeServerThread(cts.Token), cts.Token);
Task srvThread2 = Task.Run(() => FakeServerThread(cts.Token), cts.Token);
Task srvThread3 = Task.Run(() => FakeServerThread(cts.Token), cts.Token);

// Run interactive repl (application)
await InteractiveRepl(localApplicationState);

cts.Cancel();
try
{
await Task.WhenAll(srvThread1, srvThread2, srvThread3);
}
catch (OperationCanceledException)
{
Console.WriteLine("Server threads killed.");
}
}

static async Task InteractiveRepl(Dictionary<int, (long, MovieReview)> localApplicationState)
{
using var redis = await ConnectionMultiplexer.ConnectAsync(GarnetConnectionStr);
var db = redis.GetDatabase(0);

while (true)
{
Console.WriteLine("Enter a review ID (0-19) to fetch or type 'exit' to quit:");
string input = Console.ReadLine()!;

if (input.ToLower() == "exit")
{
break;
}

if (int.TryParse(input, out int reviewId) && reviewId >= 0 && reviewId <= 19)
{
var (existingEtag, existingItem) = localApplicationState[reviewId];
var (etag, movieReview) = await ETagAbstractions.GetIfNotMatch<MovieReview>(db, reviewId.ToString(), existingEtag, existingItem);

if (movieReview != null)
{
// update local application state/in-memory cache
localApplicationState[reviewId] = (etag, movieReview);
Console.WriteLine($"Movie Name: {movieReview.MovieName}");
Console.WriteLine($"Reviewer Name: {movieReview.ReviewerName}");
Console.WriteLine($"Rating: {movieReview.Rating}");
Console.WriteLine($"Review: {movieReview.Review.Substring(0, 50)}...");
}
else
{
Console.WriteLine("Review not found.");
}
}
else
{
Console.WriteLine("Invalid input. Please enter a number between 0 and 19.");
}
}
}

static async Task SeedCache(Dictionary<int, (long, MovieReview)> localApplicationState)
{
Random random = new Random();
using var redis = await ConnectionMultiplexer.ConnectAsync(GarnetConnectionStr);
var db = redis.GetDatabase(0);
// Add a bunch of things with sufficiently large payloads into your cache, the maximum size of your values depends on your pagesize config on Garnet
for (int i = 0; i < 20; i++)
{
string key = i.ToString();
MovieReview movieReview = MovieReview.CreateRandomReview(random);
string value = JsonSerializer.Serialize(movieReview);
long etag = (long)await db.ExecuteAsync("SET", key, value, "WITHETAG");
localApplicationState.Add(i, (etag, movieReview));
Console.WriteLine($"Seeded {i}");
}
}

static async Task FakeServerThread(CancellationToken token)
{
Random random = new Random();
using var redis = await ConnectionMultiplexer.ConnectAsync(GarnetConnectionStr);
var db = redis.GetDatabase(0);

// Run a loop where you are updating the items every now and then
while (true)
{
token.ThrowIfCancellationRequested();
// choose a random number [0 - 19] aka review ID in our database
// change the review and rating for it
string serverToMessWith = random.Next(19).ToString();
var (etag, movieReview) = await ETagAbstractions.GetWithEtag<MovieReview>(db, serverToMessWith);
await ETagAbstractions.PerformLockFreeSafeUpdate<MovieReview>(db, serverToMessWith, etag, movieReview!,
(moviewReview) =>
{
// the application server decides to reduce or increase the moview review rating
moviewReview.Review += random.Next(-2, 2);
});

// sleep anywhere from 10-60 seconds
await Task.Delay(TimeSpan.FromSeconds(random.Next(10, 60)));
}
}

static string GarnetConnectionStr = "localhost:6379,connectTimeout=999999,syncTimeout=999999";
}

class MovieReview
{
[JsonPropertyName("movie_name")]
public required string MovieName { get; set; }

[JsonPropertyName("reviewer_name")]
public required string ReviewerName { get; set; }

[JsonPropertyName("rating")]
public required int Rating { get; set; }

[JsonPropertyName("review")]
public required string Review { get; set; }

public static MovieReview CreateRandomReview(Random random)
{
var movieName = $"{CommonWords[random.Next(CommonWords.Length)]} {CommonWords[random.Next(CommonWords.Length)]}";
var reviewerName = $"{CommonWords[random.Next(CommonWords.Length)]} {CommonWords[random.Next(CommonWords.Length)]}";
var rating = random.Next(0, 101);
var review = GenerateLargeLoremIpsumText(1 * 1024 * 1024); // 1MB of text

return new MovieReview
{
MovieName = movieName,
ReviewerName = reviewerName,
Rating = rating,
Review = review
};
}

private static string GenerateLargeLoremIpsumText(int sizeInBytes)
{
const string loremIpsum = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. ";
var stringBuilder = new StringBuilder();

while (Encoding.UTF8.GetByteCount(stringBuilder.ToString()) < sizeInBytes)
{
stringBuilder.Append(loremIpsum);
}

return stringBuilder.ToString();
}

private static readonly string[] CommonWords = ["The", "Amazing", "Incredible", "Fantastic", "Journey", "Adventure", "Mystery", "Legend", "Quest", "Saga", "John", "Jane", "Smith", "Doe", "Alice", "Bob", "Charlie", "David", "Eve", "Frank"];
}
14 changes: 14 additions & 0 deletions samples/ETag/ETag.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>disable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="StackExchange.Redis" />
</ItemGroup>

</Project>
111 changes: 111 additions & 0 deletions samples/ETag/EtagAbstractions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using System;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;

namespace ETag;

public static class ETagAbstractions
{
/// <summary>
/// Performs a lock-free update on an item in the database using a compare-and-swap mechanism.
/// </summary>
/// <typeparam name="T">The type of the item to be updated.</typeparam>
/// <param name="db">The database instance where the item is stored.</param>
/// <param name="key">The key identifying the item in the database.</param>
/// <param name="initialEtag">The initial ETag value of the item.</param>
/// <param name="initialItem">The initial state of the item.</param>
/// <param name="updateAction">The action to perform on the item before updating it in the database.</param>
/// <returns>A task that represents the asynchronous operation. The task result contains a tuple with the final ETag value and the updated item.</returns>
public static async Task<(long, T)> PerformLockFreeSafeUpdate<T>(IDatabase db, string key, long initialEtag, T initialItem, Action<T> updateAction)
{
// Compare and Swap Updating
long etag = initialEtag;
T item = initialItem;
while (true)
{
// perform custom action, since item is updated to it's correct latest state by the server this action is performed exactly once on
// an item before it is finally updated on the server.
// NOTE: Based on your application's needs you can modify this method to update a pure function that returns a copy of the data and does not use mutations as side effects.
updateAction(item);

var (updatedSuccesful, newEtag, newItem) = await _updateItemIfMatch(db, etag, key, item);
etag = newEtag;
item = newItem;

if (updatedSuccesful)
break;
}

return (etag, item);
}

/// <summary>
/// Retrieves an item from the database if the provided ETag does not match the existing ETag.
/// Saves the network badwidth usage for cases where we have the right state in-memory already
/// </summary>
/// <typeparam name="T">The type of the item to be retrieved.</typeparam>
/// <param name="db">The database instance to execute the command on.</param>
/// <param name="key">The key of the item to be retrieved.</param>
/// <param name="existingEtag">The existing ETag to compare against.</param>
/// <returns>
/// A tuple containing the new ETag and the item if the ETag does not match; otherwise, a tuple with -1 and the default value of T.
/// </returns>
public static async Task<(long, T?)> GetIfNotMatch<T>(IDatabase db, string key, long existingEtag, T existingItem, ILogger? logger = null)
{
RedisResult res = await db.ExecuteAsync("GETIFNOTMATCH", key, existingEtag);
if (res.IsNull)
return (-1, default);

long etag = (long)res[0];

if (res[1].IsNull)
{
logger?.LogInformation("Network overhead saved, what we have is already good.");
return (etag, existingItem);
}

logger?.LogInformation("Network overhead incurred, entire item retrieved over network.");
T item = JsonSerializer.Deserialize<T>((string)res[1]!)!;
return (etag, item);
}

/// <summary>
/// Retrieves an item from the database along with its ETag.
/// </summary>
/// <typeparam name="T">The type of the item to retrieve.</typeparam>
/// <param name="db">The database instance to query.</param>
/// <param name="key">The key of the item to retrieve.</param>
/// <returns>
/// A tuple containing the ETag as a long and the item casted to type T.
/// If the database call returns null, the ETag will be -1 and the item will be null.
/// </returns>
public static async Task<(long, T?)> GetWithEtag<T>(IDatabase db, string key)
{
var executeResult = await db.ExecuteAsync("GETWITHETAG", key);
// If key is not found we get null
if (executeResult.IsNull)
{
return (-1, default(T));
}

RedisResult[] result = (RedisResult[])executeResult!;
long etag = (long)result[0];
T item = JsonSerializer.Deserialize<T>((string)result[1]!)!;
return (etag, item);
}

private static async Task<(bool updated, long etag, T)> _updateItemIfMatch<T>(IDatabase db, long etag, string key, T value)
{
// You may notice the "!" that is because we know that SETIFMATCH doesn't return null
string serializedItem = JsonSerializer.Serialize<T>(value);
RedisResult[] res = (RedisResult[])(await db.ExecuteAsync("SETIFMATCH", key, serializedItem, etag))!;

if (res[1].IsNull)
return (true, (long)res[0], value);

T deserializedItem = JsonSerializer.Deserialize<T>((string)res[1]!)!;
return (false, (long)res[0], deserializedItem);
}
}
Loading
Loading