Skip to content

Commit

Permalink
Add initial commit for adding dynamoDB
Browse files Browse the repository at this point in the history
Completely refactored the original PR to comply with updated standards, and included a container test and configuration - PhatBoyG

Updated DynamoDb with concurrency test and proper condition validation for Add/Insert and Update
  • Loading branch information
Anthony authored and phatboyg committed Feb 3, 2022
1 parent 38fe89a commit 179d846
Show file tree
Hide file tree
Showing 26 changed files with 1,471 additions and 12 deletions.
30 changes: 30 additions & 0 deletions MassTransit.sln
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.Newtonsoft", "s
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmarking", "Benchmarking", "{BF384860-70ED-47F0-B276-13D2DA9ECD87}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.DynamoDbIntegration", "src\Persistence\MassTransit.DynamoDbIntegration\MassTransit.DynamoDbIntegration.csproj", "{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.DynamoDbIntegration.Tests", "tests\MassTransit.DynamoDbIntegration.Tests\MassTransit.DynamoDbIntegration.Tests.csproj", "{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -793,6 +797,30 @@ Global
{388085C8-1BC8-48F8-8EA2-3698FCB385E5}.ReleaseUnsigned|Any CPU.Build.0 = Debug|Any CPU
{388085C8-1BC8-48F8-8EA2-3698FCB385E5}.ReleaseUnsigned|x86.ActiveCfg = Debug|Any CPU
{388085C8-1BC8-48F8-8EA2-3698FCB385E5}.ReleaseUnsigned|x86.Build.0 = Debug|Any CPU
{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}.Debug|x86.ActiveCfg = Debug|Any CPU
{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}.Debug|x86.Build.0 = Debug|Any CPU
{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}.Release|Any CPU.Build.0 = Release|Any CPU
{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}.Release|x86.ActiveCfg = Release|Any CPU
{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}.Release|x86.Build.0 = Release|Any CPU
{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}.ReleaseUnsigned|Any CPU.ActiveCfg = Debug|Any CPU
{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}.ReleaseUnsigned|Any CPU.Build.0 = Debug|Any CPU
{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}.ReleaseUnsigned|x86.ActiveCfg = Debug|Any CPU
{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}.ReleaseUnsigned|x86.Build.0 = Debug|Any CPU
{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}.Debug|Any CPU.Build.0 = Debug|Any CPU
{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}.Debug|x86.ActiveCfg = Debug|Any CPU
{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}.Debug|x86.Build.0 = Debug|Any CPU
{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}.Release|Any CPU.ActiveCfg = Release|Any CPU
{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}.Release|Any CPU.Build.0 = Release|Any CPU
{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}.Release|x86.ActiveCfg = Release|Any CPU
{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}.Release|x86.Build.0 = Release|Any CPU
{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}.ReleaseUnsigned|Any CPU.ActiveCfg = Debug|Any CPU
{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}.ReleaseUnsigned|Any CPU.Build.0 = Debug|Any CPU
{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}.ReleaseUnsigned|x86.ActiveCfg = Debug|Any CPU
{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}.ReleaseUnsigned|x86.Build.0 = Debug|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -849,6 +877,8 @@ Global
{388085C8-1BC8-48F8-8EA2-3698FCB385E5} = {4F40E08B-7C24-4D2A-8476-B7F93D0A2910}
{667E52D5-E1D9-49EE-B364-3CB7E43EE160} = {BF384860-70ED-47F0-B276-13D2DA9ECD87}
{2B6ACCF2-0CAF-4152-901F-0048390971E4} = {BF384860-70ED-47F0-B276-13D2DA9ECD87}
{186D6491-ECCD-49EB-8E99-AFD7AD6037D2} = {56F516D7-BC3C-49E1-A639-83C5F14953F8}
{694E06CF-2842-4E71-8CD2-81FA7C1B3D13} = {56F516D7-BC3C-49E1-A639-83C5F14953F8}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {43D3A7D5-0945-435E-8D03-1E631E5CDBA8}
Expand Down
2 changes: 0 additions & 2 deletions signing.props
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
<Project>
<!-- These properties will be used only by projects that sign the assemblies -->

<!-- <PropertyGroup Condition="'$(MSBuildProjectName.Contains(Tests))' == false OR $(MSBuildProjectName) == 'MassTransit.RedisIntegration'"> -->
<PropertyGroup>
<!-- Assembly Signing -->
<AssemblyOriginatorKeyFile>$(MSBuildThisFileDirectory)MassTransit.snk</AssemblyOriginatorKeyFile>
<SignAssembly>true</SignAssembly>
</PropertyGroup>
Expand Down
110 changes: 110 additions & 0 deletions src/MassTransit.TestFramework/Sagas/ChoirTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
namespace MassTransit.TestFramework.Sagas
{
namespace ChoirConcurrency
{
using System;


public class RehearsalBegins
{
public Guid CorrelationId { get; set; }
}


public class Bass
{
public Guid CorrelationId { get; set; }
public string Name { get; set; }
}


public class Baritone
{
public Guid CorrelationId { get; set; }
public string Name { get; set; }
}


public class Tenor
{
public Guid CorrelationId { get; set; }
public string Name { get; set; }
}


public class Countertenor
{
public Guid CorrelationId { get; set; }
public string Name { get; set; }
}


public class ChoirState :
SagaStateMachineInstance,
ISagaVersion
{
public string CurrentState { get; set; }
public int Harmony { get; set; }

public string BassName { get; set; }
public string BaritoneName { get; set; }
public string TenorName { get; set; }
public string CountertenorName { get; set; }
public int Version { get; set; }

public Guid CorrelationId { get; set; }
}


public class ChoirStateMachine :
MassTransitStateMachine<ChoirState>
{
public ChoirStateMachine()
{
InstanceState(x => x.CurrentState);

Event(() => RehearsalStarts, x => x.CorrelateById(context => context.Message.CorrelationId));
Event(() => BassStarts, x => x.CorrelateById(context => context.Message.CorrelationId));
Event(() => BaritoneStarts, x => x.CorrelateById(context => context.Message.CorrelationId));
Event(() => TenorStarts, x => x.CorrelateById(context => context.Message.CorrelationId));
Event(() => CountertenorStarts, x => x.CorrelateById(context => context.Message.CorrelationId));

CompositeEvent(() => AllSinging, x => x.Harmony, BassStarts, BaritoneStarts, TenorStarts, CountertenorStarts);

Initially(
When(RehearsalStarts)
.Then(context => Console.WriteLine("Rehearsal Started!!"))
.TransitionTo(Warmup));

During(Warmup,
When(BassStarts)
.Then(context => Console.WriteLine("Bass Started!!"))
.Then(context => context.Saga.BassName = context.Message.Name),
When(BaritoneStarts)
.Then(context => Console.WriteLine("Baritone Started!!"))
.Then(context => context.Saga.BaritoneName = context.Message.Name),
When(TenorStarts)
.Then(context => Console.WriteLine("Tenor Started!!"))
.Then(context => context.Saga.TenorName = context.Message.Name),
When(CountertenorStarts)
.Then(context => Console.WriteLine("CounterTenor Started!!"))
.Then(context => context.Saga.CountertenorName = context.Message.Name),
When(AllSinging)
.Then(context => Console.WriteLine("Harmony Reached!!"))
.TransitionTo(Harmony));
}

//
// ReSharper disable UnassignedGetOnlyAutoProperty
// ReSharper disable MemberCanBePrivate.Global
public Event<RehearsalBegins> RehearsalStarts { get; }
public Event<Bass> BassStarts { get; }
public Event<Baritone> BaritoneStarts { get; }
public Event<Tenor> TenorStarts { get; }
public Event<Countertenor> CountertenorStarts { get; }
public Event AllSinging { get; }
public State Warmup { get; }
public State Harmony { get; }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
namespace MassTransit.Configuration
{
using System;
using System.Collections.Generic;
using Amazon.DynamoDBv2.DataModel;
using DynamoDbIntegration.Saga;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Saga;


public class DynamoDbSagaRepositoryConfigurator<TSaga> :
IDynamoDbSagaRepositoryConfigurator<TSaga>,
ISpecification
where TSaga : class, ISagaVersion
{
Func<IServiceProvider, IDynamoDBContext> _contextFactory;

public DynamoDbSagaRepositoryConfigurator()
{
LockTimeout = TimeSpan.FromMinutes(30);
}

public string TableName { get; set; }
public string LockSuffix { get; set; }
public TimeSpan LockTimeout { get; set; }
public TimeSpan? Expiration { get; set; }

public void ContextFactory(Func<IDynamoDBContext> contextFactory)
{
_contextFactory = provider => contextFactory();
}

public void ContextFactory(Func<IServiceProvider, IDynamoDBContext> contextFactory)
{
_contextFactory = contextFactory;
}

public IEnumerable<ValidationResult> Validate()
{
if (_contextFactory == null)
yield return this.Failure("ContextFactory", "must be specified");
if (string.IsNullOrWhiteSpace(TableName))
yield return this.Failure("TableName", "must be specified");
if (LockTimeout <= TimeSpan.Zero)
yield return this.Failure("LockTimeout", "Must be > TimeSpan.Zero");
if (Expiration < TimeSpan.FromSeconds(30))
yield return this.Failure("Expiration", "If specified, must be > 30 seconds");
}

public void Register<T>(ISagaRepositoryRegistrationConfigurator<T> configurator)
where T : class, ISagaVersion
{
configurator.TryAddSingleton(_contextFactory);
configurator.TryAddSingleton(new DynamoDbSagaRepositoryOptions<T>(TableName, Expiration));
configurator.RegisterSagaRepository<T, DatabaseContext<T>, SagaConsumeContextFactory<DatabaseContext<T>, T>,
DynamoDbSagaRepositoryContextFactory<T>>();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
namespace MassTransit.Configuration
{
using System;
using Internals;


public class DynamoDbSagaRepositoryRegistrationProvider :
ISagaRepositoryRegistrationProvider
{
readonly Action<IDynamoDbSagaRepositoryConfigurator> _configure;

public DynamoDbSagaRepositoryRegistrationProvider(Action<IDynamoDbSagaRepositoryConfigurator> configure)
{
_configure = configure;
}

void ISagaRepositoryRegistrationProvider.Configure<TSaga>(ISagaRegistrationConfigurator<TSaga> configurator)
where TSaga : class
{
if (typeof(TSaga).HasInterface<ISagaVersion>())
{
var proxy = (IProxy)Activator.CreateInstance(typeof(Proxy<>).MakeGenericType(typeof(TSaga)), configurator);

proxy.Configure(this);
}
}

protected virtual void Configure<TSaga>(ISagaRegistrationConfigurator<TSaga> configurator)
where TSaga : class, ISagaVersion
{
configurator.DynamoDbRepository(r => _configure?.Invoke(r));
}


interface IProxy
{
public void Configure<T>(T provider)
where T : DynamoDbSagaRepositoryRegistrationProvider;
}


class Proxy<TSaga> :
IProxy
where TSaga : class, ISagaVersion
{
readonly ISagaRegistrationConfigurator<TSaga> _configurator;

public Proxy(ISagaRegistrationConfigurator<TSaga> configurator)
{
_configurator = configurator;
}

public void Configure<T>(T provider)
where T : DynamoDbSagaRepositoryRegistrationProvider
{
provider.Configure(_configurator);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
namespace MassTransit
{
using System;
using Configuration;


public static class DynamoDbSagaRepositoryRegistrationExtensions
{
/// <summary>
/// Adds a DynamoDb saga repository to the registration
/// </summary>
/// <param name="configurator"></param>
/// <param name="configure"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static ISagaRegistrationConfigurator<T> DynamoDbRepository<T>(this ISagaRegistrationConfigurator<T> configurator,
Action<IDynamoDbSagaRepositoryConfigurator<T>> configure = null)
where T : class, ISagaVersion
{
var repositoryConfigurator = new DynamoDbSagaRepositoryConfigurator<T>();

configure?.Invoke(repositoryConfigurator);

repositoryConfigurator.Validate().ThrowIfContainsFailure("The DynamoDb saga repository configuration is invalid:");

configurator.Repository(x => repositoryConfigurator.Register(x));

return configurator;
}

/// <summary>
/// Use the DynamoDb saga repository for sagas configured by type (without a specific generic call to AddSaga/AddSagaStateMachine)
/// </summary>
/// <param name="configurator"></param>
/// <param name="configure"></param>
public static void SetDynamoDbSagaRepositoryProvider(this IRegistrationConfigurator configurator, Action<IDynamoDbSagaRepositoryConfigurator> configure)
{
configurator.SetSagaRepositoryProvider(new DynamoDbSagaRepositoryRegistrationProvider(configure));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace MassTransit
{
using System;
using Amazon.DynamoDBv2.DataModel;


public interface IDynamoDbSagaRepositoryConfigurator
{
string TableName { set; }
string LockSuffix { set; }
TimeSpan LockTimeout { set; }
TimeSpan? Expiration { set; }

/// <summary>
/// Factory method to get the DynamoDb context
/// </summary>
/// <param name="contextFactory"></param>
void ContextFactory(Func<IDynamoDBContext> contextFactory);

/// <summary>
/// Use the container to build the DynamoDb context
/// </summary>
/// <param name="contextFactory"></param>
void ContextFactory(Func<IServiceProvider, IDynamoDBContext> contextFactory);
}


public interface IDynamoDbSagaRepositoryConfigurator<TSaga> :
IDynamoDbSagaRepositoryConfigurator
where TSaga : class, ISagaVersion
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
namespace MassTransit.DynamoDbIntegration.Saga
{
using System;
using System.Threading.Tasks;


public interface DatabaseContext<TSaga> :
IDisposable
where TSaga : class, ISagaVersion
{
Task Add(SagaConsumeContext<TSaga> context);

Task Insert(TSaga instance);

Task<TSaga> Load(Guid correlationId);

Task Update(SagaConsumeContext<TSaga> context);

Task Delete(SagaConsumeContext<TSaga> context);
}
}
Loading

0 comments on commit 179d846

Please sign in to comment.