Skip to content

Commit 66c8df1

Browse files
committed
Add executor spinning in the background
To prevent users from having to implement the feature themselves and provide an alternative to the old spinning behaviour. The new executor uses a long running `Task` instead of a `Thread` to provide better integration with the C# ecosystem.
1 parent 78e8552 commit 66c8df1

File tree

6 files changed

+422
-1
lines changed

6 files changed

+422
-1
lines changed

src/ros2cs/ros2cs_core/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ set(CS_SOURCES
114114
Context.cs
115115
GuardCondition.cs
116116
executors/ManualExecutor.cs
117+
executors/TaskExecutor.cs
117118
properties/AssemblyInfo.cs
118119
)
119120

src/ros2cs/ros2cs_core/executors/ManualExecutor.cs

+21
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
using System.Diagnostics;
1919
using System.Linq;
2020
using System.Threading;
21+
using System.Threading.Tasks;
2122

2223
namespace ROS2.Executors
2324
{
@@ -433,6 +434,26 @@ public void SpinWhile(Func<bool> condition, TimeSpan timeout)
433434
}
434435
}
435436

437+
/// <summary>
438+
/// Create a task which calls <see cref="SpinWhile"/> when started.
439+
/// </summary>
440+
/// <remarks>
441+
/// The resulting task prevents <see cref="TrySpin"/> and <see cref="Rescan"/> from being called
442+
/// and this instance as well as its context from being disposed safely while it is running.
443+
/// </remarks>
444+
/// <param name="timeout"> Maximum time to wait for work to become available. </param>
445+
/// <param name="cancellationToken"> Token to cancel the task. </param>
446+
/// <returns> Task representing the spin operation. </returns>
447+
public Task CreateSpinTask(TimeSpan timeout, CancellationToken cancellationToken)
448+
{
449+
return new Task(() => {
450+
using (cancellationToken.Register(this.Interrupt))
451+
{
452+
this.SpinWhile(() => { cancellationToken.ThrowIfCancellationRequested(); return true; }, timeout);
453+
}
454+
}, cancellationToken, TaskCreationOptions.LongRunning);
455+
}
456+
436457
/// <remarks>
437458
/// This method is not thread safe and may not be called from
438459
/// multiple threads simultaneously or while the executor is in use.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
using System;
2+
using System.Collections;
3+
using System.Collections.Generic;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace ROS2.Executors
8+
{
9+
/// <summary>
10+
/// Executor which wraps a <see cref="ManualExecutor"/> and automatically
11+
/// executes the task created by <see cref="ManualExecutor.CreateSpinTask"/>.
12+
/// </summary>
13+
/// <remarks>
14+
/// The spin task is automatically stopped when <see cref="Dispose"/>
15+
/// is called or the context is shut down.
16+
/// </remarks>
17+
public sealed class TaskExecutor : IExecutor
18+
{
19+
/// <summary>
20+
/// Task managed by this executor.
21+
/// </summary>
22+
public Task Task { get; private set; }
23+
24+
private readonly CancellationTokenSource CancellationSource = new CancellationTokenSource();
25+
26+
private readonly ManualExecutor Executor;
27+
28+
private readonly Context Context;
29+
30+
/// <param name="context"> Context associated with this executor. </param>
31+
/// <param name="timeout"> Maximum time to wait for work to become available. </param>
32+
public TaskExecutor(Context context, TimeSpan timeout)
33+
{
34+
this.Context = context;
35+
this.Executor = new ManualExecutor(context);
36+
this.Task = this.Executor.CreateSpinTask(timeout, this.CancellationSource.Token);
37+
try
38+
{
39+
context.OnShutdown += this.StopSpinTask;
40+
this.Task.Start();
41+
}
42+
catch (SystemException)
43+
{
44+
try
45+
{
46+
context.OnShutdown -= this.StopSpinTask;
47+
}
48+
finally
49+
{
50+
this.Executor.Dispose();
51+
}
52+
throw;
53+
}
54+
}
55+
56+
/// <inheritdoc/>
57+
public bool IsDisposed
58+
{
59+
get => this.Executor.IsDisposed;
60+
}
61+
62+
/// <inheritdoc/>
63+
public int Count
64+
{
65+
get => this.Executor.Count;
66+
}
67+
68+
/// <inheritdoc/>
69+
public bool IsReadOnly
70+
{
71+
get => this.Executor.IsReadOnly;
72+
}
73+
74+
/// <inheritdoc/>
75+
public void Add(INode node)
76+
{
77+
this.Executor.Add(node);
78+
}
79+
80+
/// <inheritdoc/>
81+
public void Clear()
82+
{
83+
this.Executor.Clear();
84+
}
85+
86+
/// <inheritdoc/>
87+
public bool Contains(INode node)
88+
{
89+
return this.Executor.Contains(node);
90+
}
91+
92+
/// <inheritdoc/>
93+
public void CopyTo(INode[] array, int arrayIndex)
94+
{
95+
this.Executor.CopyTo(array, arrayIndex);
96+
}
97+
98+
/// <inheritdoc/>
99+
public bool Remove(INode node)
100+
{
101+
return this.Executor.Remove(node);
102+
}
103+
104+
/// <inheritdoc/>
105+
public IEnumerator<INode> GetEnumerator()
106+
{
107+
return this.Executor.GetEnumerator();
108+
}
109+
110+
/// <inheritdoc/>
111+
IEnumerator IEnumerable.GetEnumerator()
112+
{
113+
return this.GetEnumerator();
114+
}
115+
116+
/// <inheritdoc />
117+
public void ScheduleRescan()
118+
{
119+
this.Executor.ScheduleRescan();
120+
}
121+
122+
/// <inheritdoc />
123+
public bool TryScheduleRescan(INode node)
124+
{
125+
return this.Executor.TryScheduleRescan(node);
126+
}
127+
128+
/// <inheritdoc />
129+
public void Wait()
130+
{
131+
this.Executor.Wait();
132+
}
133+
134+
/// <inheritdoc />
135+
public bool TryWait(TimeSpan timeout)
136+
{
137+
return this.Executor.TryWait(timeout);
138+
}
139+
140+
/// <summary>
141+
/// Stop the spin task and return after it has stopped.
142+
/// </summary>
143+
/// <remarks>
144+
/// This function returns immediately if the spin task
145+
/// has already been stopped.
146+
/// </remarks>
147+
private void StopSpinTask()
148+
{
149+
try
150+
{
151+
this.CancellationSource.Cancel();
152+
}
153+
catch (ObjectDisposedException)
154+
{
155+
// task has been canceled before
156+
}
157+
try
158+
{
159+
this.Task.Wait();
160+
}
161+
catch (AggregateException e)
162+
{
163+
e.Handle(inner => inner is TaskCanceledException);
164+
}
165+
catch (ObjectDisposedException)
166+
{
167+
// task has already stopped
168+
}
169+
}
170+
171+
/// <inheritdoc />
172+
/// <remarks>
173+
/// The wrapper handles stopping the spin task.
174+
/// </remarks>
175+
public void Dispose()
176+
{
177+
try
178+
{
179+
this.StopSpinTask();
180+
}
181+
catch (AggregateException)
182+
{
183+
// prevent faulted task from preventing disposal
184+
}
185+
this.Context.OnShutdown -= this.StopSpinTask;
186+
this.Task.Dispose();
187+
this.Executor.Dispose();
188+
this.CancellationSource.Dispose();
189+
}
190+
}
191+
}

src/ros2cs/ros2cs_tests/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ if(BUILD_TESTING)
5454
src/WaitSetTest.cs
5555
src/GuardConditionTest.cs
5656
src/ManualExecutorTest.cs
57+
src/TaskExecutorTest.cs
5758
)
5859

5960
add_dotnet_test(ros2cs_tests

src/ros2cs/ros2cs_tests/src/ManualExecutorTest.cs

+88-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using System.Collections.Generic;
1717
using System.Diagnostics;
1818
using System.Threading;
19+
using System.Threading.Tasks;
1920
using NUnit.Framework;
2021
using ROS2.Executors;
2122

@@ -319,6 +320,92 @@ public void Clear()
319320
Assert.That(this.Executor.RescanScheduled, Is.True);
320321
}
321322

323+
[Test]
324+
public void SpinInTask()
325+
{
326+
using ManualResetEventSlim wasSpun = new ManualResetEventSlim(false);
327+
using var guardCondition = this.Context.CreateGuardCondition(wasSpun.Set);
328+
this.WaitSet.GuardConditions.Add(guardCondition);
329+
330+
using var cancellationSource = new CancellationTokenSource();
331+
using Task spinTask = this.Executor.CreateSpinTask(TimeSpan.FromSeconds(0.5), cancellationSource.Token);
332+
333+
Assert.That(spinTask.Status, Is.EqualTo(TaskStatus.Created));
334+
335+
spinTask.Start();
336+
try
337+
{
338+
while (spinTask.Status != TaskStatus.Running)
339+
{
340+
Thread.Yield(); // wait for task to be scheduled
341+
}
342+
Assert.That(wasSpun.Wait(TimeSpan.FromSeconds(1)), Is.False);
343+
guardCondition.Trigger();
344+
Assert.That(wasSpun.Wait(TimeSpan.FromSeconds(1)), Is.True);
345+
wasSpun.Reset();
346+
}
347+
finally
348+
{
349+
cancellationSource.Cancel();
350+
try
351+
{
352+
spinTask.Wait();
353+
}
354+
catch (AggregateException e)
355+
{
356+
e.Handle(inner => inner is TaskCanceledException);
357+
}
358+
}
359+
360+
Assert.That(spinTask.Status, Is.EqualTo(TaskStatus.Canceled));
361+
guardCondition.Trigger();
362+
Assert.That(wasSpun.Wait(TimeSpan.FromSeconds(1)), Is.False);
363+
}
364+
365+
[Test]
366+
public void ExceptionWhileSpinningInTask()
367+
{
368+
using var guardCondition = this.Context.CreateGuardCondition(() =>
369+
{
370+
throw new SimulatedException("simulating runtime exception");
371+
});
372+
this.WaitSet.GuardConditions.Add(guardCondition);
373+
374+
using var cancellationSource = new CancellationTokenSource();
375+
using Task spinTask = this.Executor.CreateSpinTask(TimeSpan.FromSeconds(0.5), cancellationSource.Token);
376+
377+
spinTask.Start();
378+
try
379+
{
380+
while (spinTask.Status != TaskStatus.Running)
381+
{
382+
Thread.Yield(); // wait for task to be scheduled
383+
}
384+
guardCondition.Trigger();
385+
var exception = Assert.Throws<AggregateException>(() => spinTask.Wait(TimeSpan.FromSeconds(1)));
386+
Assert.That(exception.InnerExceptions, Has.Some.Matches(new Predicate<Exception>(e => e is SimulatedException)));
387+
Assert.That(spinTask.Status, Is.EqualTo(TaskStatus.Faulted));
388+
}
389+
finally
390+
{
391+
cancellationSource.Cancel();
392+
try
393+
{
394+
spinTask.Wait();
395+
}
396+
catch (AggregateException e)
397+
{
398+
e.Handle(inner => inner is TaskCanceledException || inner is SimulatedException);
399+
}
400+
}
401+
}
402+
403+
private sealed class SimulatedException : Exception
404+
{
405+
public SimulatedException(string msg) : base(msg)
406+
{ }
407+
}
408+
322409
private sealed class DummyExecutor : HashSet<INode>, IExecutor
323410
{
324411
public bool IsDisposed
@@ -343,7 +430,7 @@ public bool TryWait(TimeSpan timeout)
343430
}
344431

345432
public void Dispose()
346-
{}
433+
{ }
347434
}
348435
}
349436
}

0 commit comments

Comments
 (0)