Skip to content

Commit

Permalink
Modified import functions to be non blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannewington committed Dec 19, 2018
1 parent 554d812 commit aee9e11
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 66 deletions.
23 changes: 22 additions & 1 deletion src/Lithnet.Okta.ManagementAgent/AsyncHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Lithnet.Okta.ManagementAgent
internal static class AsyncHelper
{
private static readonly TaskFactory factory = new TaskFactory(CancellationToken.None, TaskCreationOptions.None, TaskContinuationOptions.None, TaskScheduler.Default);

// This function destroys ILMerge, so we switched to ILRepack instead. The GetAwaiter() call seems to be responsible for making ILMerge hang
public static TResult RunSync<TResult>(Task<TResult> func)
{
Expand All @@ -33,5 +33,26 @@ public static void RunSync(Task func, CancellationToken token)
{
factory.StartNew(() => func, token).Unwrap().GetAwaiter().GetResult();
}

public static long InterlockedCombine(ref long location,
Func<long, long> update)
{
long initialValue, newValue;

do
{
initialValue = location;
newValue = update(initialValue);
}
while (Interlocked.CompareExchange(ref location, newValue, initialValue) != initialValue);

return initialValue;
}

public static long InterlockedMax(ref long location, long value)
{
return InterlockedCombine(ref location,
v => Math.Max(v, value));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
<Reference Include="FlexibleConfiguration, Version=1.2.1.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\FlexibleConfiguration.1.2.1\lib\net45\FlexibleConfiguration.dll</HintPath>
</Reference>
<Reference Include="Lithnet.Ecma2Framework, Version=1.0.6926.17296, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Lithnet.Ecma2Framework.1.0.6926.17296\lib\net461\Lithnet.Ecma2Framework.dll</HintPath>
<Reference Include="Lithnet.Ecma2Framework, Version=1.0.6927.15182, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Lithnet.Ecma2Framework.1.0.6927.15182\lib\net461\Lithnet.Ecma2Framework.dll</HintPath>
</Reference>
<Reference Include="Lithnet.MetadirectoryServices, Version=1.0.6730.35901, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Lithnet.MetadirectoryServices.1.0.6730.35901\lib\net461\Lithnet.MetadirectoryServices.dll</HintPath>
Expand Down Expand Up @@ -189,6 +189,9 @@
<Private>True</Private>
</Reference>
<Reference Include="System.ServiceModel" />
<Reference Include="System.Threading.Tasks.Dataflow, Version=4.6.3.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<HintPath>..\packages\System.Threading.Tasks.Dataflow.4.9.0\lib\netstandard2.0\System.Threading.Tasks.Dataflow.dll</HintPath>
</Reference>
<Reference Include="System.Transactions" />
<Reference Include="System.ValueTuple, Version=4.0.2.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
<HintPath>..\packages\System.ValueTuple.4.4.0\lib\net461\System.ValueTuple.dll</HintPath>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Lithnet.Ecma2Framework;
using Lithnet.MetadirectoryServices;
using Microsoft.MetadirectoryServices;
Expand All @@ -16,49 +17,78 @@ internal class GroupImportProvider : IObjectImportProvider
{
private const string GroupUpdateKey = "group";
private const string GroupMemberUpdateKey = "group-member";

private DateTime? lastGroupUpdateHighWatermark;

private DateTime? lastGroupMemberUpdateHighWatermark;

private static Logger logger = LogManager.GetCurrentClassLogger();

public void GetCSEntryChanges(ImportContext context, SchemaType type)
{
ParallelOptions options = new ParallelOptions { CancellationToken = context.CancellationTokenSource.Token };
AsyncHelper.RunSync(this.GetCSEntryChangesAsync(context, type), context.CancellationTokenSource.Token);
}

if (Debugger.IsAttached)
public async Task GetCSEntryChangesAsync(ImportContext context, SchemaType type)
{
try
{
options.MaxDegreeOfParallelism = 1;
IAsyncEnumerable<IGroup> groups = this.GetGroupEnumerable(context.InDelta, context.ConfigParameters, context.IncomingWatermark, ((OktaConnectionContext)context.ConnectionContext).Client);
BufferBlock<IGroup> queue = new BufferBlock<IGroup>();

Task consumer = this.ConsumeObjects(context, type, queue);

// Post source data to the dataflow block.
await this.ProduceObjects(groups, queue).ConfigureAwait(false);

// Wait for the consumer to process all data.
await consumer.ConfigureAwait(false);
}
catch (Exception ex)
{
logger.Error(ex, "There was an error importing the group data");
throw;
}
}

private async Task ProduceObjects(IAsyncEnumerable<IGroup> groups, ITargetBlock<IGroup> target)
{
await groups.ForEachAsync(t => target.Post(t));
target.Complete();
}

object syncObject = new object();
private async Task ConsumeObjects(ImportContext context, SchemaType type, ISourceBlock<IGroup> source)
{
long groupUpdateHighestTicks = 0;
long groupMemberUpdateHighestTicks = 0;

IAsyncEnumerable<IGroup> groups = this.GetGroupEnumerable(context.InDelta, context.ConfigParameters, context.IncomingWatermark, ((OktaConnectionContext)context.ConnectionContext).Client);

Parallel.ForEach(groups.ToEnumerable(), options, group =>
while (await source.OutputAvailableAsync())
{
IGroup group = source.Receive();

try
{
if (group.LastUpdated.HasValue)
{
lock (syncObject)
{
groupUpdateHighestTicks = Math.Max(groupUpdateHighestTicks, group.LastUpdated.Value.Ticks);
groupMemberUpdateHighestTicks = Math.Max(groupMemberUpdateHighestTicks, group.LastMembershipUpdated.Value.Ticks);
}
AsyncHelper.InterlockedMax(ref groupUpdateHighestTicks, group.LastUpdated.Value.Ticks);
}

if (group.LastMembershipUpdated.HasValue)
{
AsyncHelper.InterlockedMax(ref groupMemberUpdateHighestTicks, group.LastMembershipUpdated.Value.Ticks);
}

CSEntryChange c = this.GroupToCSEntryChange(context, type, group);
CSEntryChange c = await this.GroupToCSEntryChange(context, type, group).ConfigureAwait(false);

if (c != null)
{
context.ImportItems.Add(c, context.CancellationTokenSource.Token);
}
}
catch (Exception ex)
{
logger.Error(ex);
GroupImportProvider.logger.Error(ex);
CSEntryChange csentry = CSEntryChange.Create();
csentry.DN = group.Id;
csentry.ErrorCodeImport = MAImportError.ImportErrorCustomContinueRun;
Expand All @@ -67,8 +97,8 @@ public void GetCSEntryChanges(ImportContext context, SchemaType type)
context.ImportItems.Add(csentry, context.CancellationTokenSource.Token);
}

options.CancellationToken.ThrowIfCancellationRequested();
});
context.CancellationTokenSource.Token.ThrowIfCancellationRequested();
}

string wmv;

Expand All @@ -95,7 +125,7 @@ public void GetCSEntryChanges(ImportContext context, SchemaType type)
context.OutgoingWatermark.Add(new Watermark(GroupMemberUpdateKey, wmv, "DateTime"));
}

private CSEntryChange GroupToCSEntryChange(ImportContext context, SchemaType schemaType, IGroup group)
private async Task<CSEntryChange> GroupToCSEntryChange(ImportContext context, SchemaType schemaType, IGroup group)
{
Resource profile = group.GetProperty<Resource>("profile");
logger.Trace($"Creating CSEntryChange for group {group.Id}");
Expand Down Expand Up @@ -124,7 +154,9 @@ private CSEntryChange GroupToCSEntryChange(ImportContext context, SchemaType sch
{
IList<object> members = new List<object>();

((OktaConnectionContext) context.ConnectionContext).Client.GetCollection<User>($"/api/v1/groups/{group.Id}/skinny_users").ForEach(u => members.Add(u.Id));
var items = ((OktaConnectionContext) context.ConnectionContext).Client.GetCollection<User>($"/api/v1/groups/{group.Id}/skinny_users");

await items.ForEachAsync(u => members.Add(u.Id)).ConfigureAwait(false);

if (modType == ObjectModificationType.Update)
{
Expand Down Expand Up @@ -180,8 +212,6 @@ private DateTime GetLastHighWatermarkGroupMember(WatermarkKeyedCollection import
return this.GetLastHighWatermark(importState, GroupMemberUpdateKey);
}



private DateTime GetLastHighWatermark(WatermarkKeyedCollection importState, string key)
{
if (importState == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Lithnet.Ecma2Framework;
using Lithnet.MetadirectoryServices;
using Microsoft.MetadirectoryServices;
using NLog;
using Okta.Sdk;
using System.Collections.Concurrent;
using System.Threading.Tasks.Dataflow;

namespace Lithnet.Okta.ManagementAgent
{
Expand All @@ -22,62 +21,50 @@ public void GetCSEntryChanges(ImportContext context, SchemaType type)
AsyncHelper.RunSync(this.GetCSEntryChangesAsync(context, type), context.CancellationTokenSource.Token);
}

private async Task GetCSEntryChangesAsync(ImportContext context, SchemaType type)
public async Task GetCSEntryChangesAsync(ImportContext context, SchemaType type)
{
IAsyncEnumerable<IUser> users = this.GetUserEnumerable(context.InDelta, context.IncomingWatermark, ((OktaConnectionContext)context.ConnectionContext).Client, context);
BlockingCollection<IUser> queue = new BlockingCollection<IUser>();

var consumerTask = Task.Run<long>(() => this.ConsumeUserObjects(context, type, queue), context.CancellationTokenSource.Token);

await users.ForEachAsync(t => queue.Add(t)).ConfigureAwait(false);

queue.CompleteAdding();
try
{
IAsyncEnumerable<IUser> users = this.GetUserEnumerable(context.InDelta, context.IncomingWatermark, ((OktaConnectionContext)context.ConnectionContext).Client, context);
BufferBlock<IUser> queue = new BufferBlock<IUser>();

long userHighestTicks = await consumerTask.ConfigureAwait(false);
Task consumer = this.ConsumeObjects(context, type, queue);

string wmv;
// Post source data to the dataflow block.
await this.ProduceObjects(users, queue).ConfigureAwait(false);

if (userHighestTicks <= 0)
{
wmv = context.IncomingWatermark["users"].Value;
// Wait for the consumer to process all data.
await consumer.ConfigureAwait(false);
}
else
catch (Exception ex)
{
wmv = userHighestTicks.ToString();
logger.Error(ex, "There was an error importing the user data");
throw;
}

context.OutgoingWatermark.Add(new Watermark("users", wmv, "DateTime"));
}

private long ConsumeUserObjects(ImportContext context, SchemaType type, BlockingCollection<IUser> producer)
private async Task ProduceObjects(IAsyncEnumerable<IUser> users, ITargetBlock<IUser> target)
{
ParallelOptions options = new ParallelOptions { CancellationToken = context.CancellationTokenSource.Token };

if (Debugger.IsAttached)
{
options.MaxDegreeOfParallelism = 1;
}
else
{
options.MaxDegreeOfParallelism = OktaMAConfigSection.Configuration.ImportThreads;
}
await users.ForEachAsync(t => target.Post(t));
target.Complete();
}

object syncObject = new object();
private async Task ConsumeObjects(ImportContext context, SchemaType type, ISourceBlock<IUser> source)
{
long userHighestTicks = 0;

Parallel.ForEach<IUser>(producer.GetConsumingEnumerable(), options, user =>
while (await source.OutputAvailableAsync())
{
IUser user = source.Receive();

try
{
if (user.LastUpdated.HasValue)
{
lock (syncObject)
{
userHighestTicks = Math.Max(userHighestTicks, user.LastUpdated.Value.Ticks);
}
AsyncHelper.InterlockedMax(ref userHighestTicks, user.LastUpdated.Value.Ticks);
}

CSEntryChange c = AsyncHelper.RunSync(this.UserToCSEntryChange(context.InDelta, type, user, context), context.CancellationTokenSource.Token);
CSEntryChange c = await this.UserToCSEntryChange(context.InDelta, type, user, context).ConfigureAwait(false);

if (c != null)
{
Expand All @@ -95,10 +82,21 @@ private long ConsumeUserObjects(ImportContext context, SchemaType type, Blocking
context.ImportItems.Add(csentry, context.CancellationTokenSource.Token);
}

options.CancellationToken.ThrowIfCancellationRequested();
});
context.CancellationTokenSource.Token.ThrowIfCancellationRequested();
}

string wmv;

if (userHighestTicks <= 0)
{
wmv = context.IncomingWatermark["users"].Value;
}
else
{
wmv = userHighestTicks.ToString();
}

return userHighestTicks;
context.OutgoingWatermark.Add(new Watermark("users", wmv, "DateTime"));
}

private async Task<CSEntryChange> UserToCSEntryChange(bool inDelta, SchemaType schemaType, IUser user, ImportContext context)
Expand Down Expand Up @@ -278,4 +276,4 @@ public bool CanImport(SchemaType type)
return type.Name == "user";
}
}
}
}
3 changes: 2 additions & 1 deletion src/Lithnet.Okta.ManagementAgent/packages.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<package id="FlexibleConfiguration" version="1.2.1" targetFramework="net461" />
<package id="Fody" version="3.3.3" targetFramework="net461" developmentDependency="true" />
<package id="ILRepack" version="2.0.16" targetFramework="net461" />
<package id="Lithnet.Ecma2Framework" version="1.0.6926.17296" targetFramework="net461" />
<package id="Lithnet.Ecma2Framework" version="1.0.6927.15182" targetFramework="net461" />
<package id="Lithnet.MetadirectoryServices" version="1.0.6730.35901" targetFramework="net461" />
<package id="Microsoft.Extensions.DependencyInjection.Abstractions" version="2.0.0" targetFramework="net461" />
<package id="Microsoft.Extensions.Logging" version="2.0.1" targetFramework="net461" />
Expand Down Expand Up @@ -64,6 +64,7 @@
<package id="System.Text.RegularExpressions" version="4.3.0" targetFramework="net461" />
<package id="System.Threading" version="4.3.0" targetFramework="net461" />
<package id="System.Threading.Tasks" version="4.3.0" targetFramework="net461" />
<package id="System.Threading.Tasks.Dataflow" version="4.9.0" targetFramework="net461" />
<package id="System.Threading.Timer" version="4.3.0" targetFramework="net461" />
<package id="System.ValueTuple" version="4.4.0" targetFramework="net461" />
<package id="System.Xml.ReaderWriter" version="4.3.0" targetFramework="net461" />
Expand Down

0 comments on commit aee9e11

Please sign in to comment.