Skip to content

Commit

Permalink
CSHARP-4144: Implemented ChangeStreamOptions.ShowExpandedEvents optio…
Browse files Browse the repository at this point in the history
…n for improved change stream event visibility.
  • Loading branch information
JamesKovacs committed Jul 15, 2022
1 parent f219f75 commit fc1602e
Show file tree
Hide file tree
Showing 18 changed files with 1,137 additions and 10 deletions.
41 changes: 41 additions & 0 deletions src/MongoDB.Driver.Core/ChangeStreamDocument.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,28 @@ public ChangeStreamDocument(
/// </value>
public CollectionNamespace CollectionNamespace => GetValue<CollectionNamespace>(nameof(CollectionNamespace), null);

/// <summary>
/// Gets ui field from the oplog entry corresponding to the change event.
/// Only present when the showExpandedEvents change stream option is enabled and for the following event types (MongoDB 6.0 and later):
/// <list type="bullet">
/// <item><description><see cref="ChangeStreamOperationType.Create"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.CreateIndexes"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.Delete"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.Drop"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.DropIndexes"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.Insert"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.Modify"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.RefineCollectionShardKey"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.ReshardCollection"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.ShardCollection"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.Update"/></description></item>
/// </list>
/// </summary>
/// <value>
/// The UUID of the collection.
/// </value>
public Guid? CollectionUuid => GetValue<Guid?>(nameof(CollectionUuid), null);

/// <summary>
/// Gets the database namespace.
/// </summary>
Expand Down Expand Up @@ -122,6 +144,25 @@ public TDocument FullDocumentBeforeChange
}
}

/// <summary>
/// Gets the description for the operation.
/// Only present when the showExpandedEvents change stream option is enabled and for the following event types (MongoDB 6.0 and later):
/// <list type="bullet">
/// <item><description><see cref="ChangeStreamOperationType.Create"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.CreateIndexes"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.DropIndexes"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.Modify"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.RefineCollectionShardKey"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.Rename"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.ReshardCollection"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.ShardCollection"/></description></item>
/// </list>
/// </summary>
/// <value>
/// The description of the operation.
/// </value>
public BsonDocument OperationDescription => GetValue<BsonDocument>(nameof(OperationDescription), null);

/// <summary>
/// Gets the type of the operation.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions src/MongoDB.Driver.Core/ChangeStreamDocumentSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ public ChangeStreamDocumentSerializer(

RegisterMember("ClusterTime", "clusterTime", BsonTimestampSerializer.Instance);
RegisterMember("CollectionNamespace", "ns", ChangeStreamDocumentCollectionNamespaceSerializer.Instance);
RegisterMember("CollectionUuid", "ui", GuidSerializer.StandardInstance);
RegisterMember("DatabaseNamespace", "ns", ChangeStreamDocumentDatabaseNamespaceSerializer.Instance);
RegisterMember("DocumentKey", "documentKey", BsonDocumentSerializer.Instance);
RegisterMember("FullDocument", "fullDocument", _documentSerializer);
RegisterMember("FullDocumentBeforeChange", "fullDocumentBeforeChange", _documentSerializer);
RegisterMember("OperationDescription", "operationDescription", BsonDocumentSerializer.Instance);
RegisterMember("OperationType", "operationType", ChangeStreamOperationTypeSerializer.Instance);
RegisterMember("RenameTo", "to", ChangeStreamDocumentCollectionNamespaceSerializer.Instance);
RegisterMember("ResumeToken", "_id", BsonDocumentSerializer.Instance);
Expand Down
30 changes: 29 additions & 1 deletion src/MongoDB.Driver.Core/ChangeStreamOperationType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,34 @@ public enum ChangeStreamOperationType
/// <summary>
/// A dropDatabase operation type.
/// </summary>
DropDatabase
DropDatabase,
/// <summary>
/// A createIndexes operation type.
/// </summary>
CreateIndexes,
/// <summary>
/// A dropIndexes operation type.
/// </summary>
DropIndexes,
/// <summary>
/// A modify operation type.
/// </summary>
Modify,
/// <summary>
/// A create operation type.
/// </summary>
Create,
/// <summary>
/// A shardCollection operation type.
/// </summary>
ShardCollection,
/// <summary>
/// A refineCollectionShardKey operation type.
/// </summary>
RefineCollectionShardKey,
/// <summary>
/// A reshardCollection operation type.
/// </summary>
ReshardCollection
}
}
14 changes: 14 additions & 0 deletions src/MongoDB.Driver.Core/ChangeStreamOperationTypeSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ public override ChangeStreamOperationType Deserialize(BsonDeserializationContext
case "rename": return ChangeStreamOperationType.Rename;
case "drop": return ChangeStreamOperationType.Drop;
case "dropDatabase": return ChangeStreamOperationType.DropDatabase;
case "createIndexes": return ChangeStreamOperationType.CreateIndexes;
case "dropIndexes": return ChangeStreamOperationType.DropIndexes;
case "modify": return ChangeStreamOperationType.Modify;
case "create": return ChangeStreamOperationType.Create;
case "shardCollection": return ChangeStreamOperationType.ShardCollection;
case "refineCollectionShardKey": return ChangeStreamOperationType.RefineCollectionShardKey;
case "reshardCollection": return ChangeStreamOperationType.ReshardCollection;
default: return (ChangeStreamOperationType)(-1);
}
}
Expand All @@ -73,6 +80,13 @@ public override void Serialize(BsonSerializationContext context, BsonSerializati
case ChangeStreamOperationType.Rename: writer.WriteString("rename"); break;
case ChangeStreamOperationType.Drop: writer.WriteString("drop"); break;
case ChangeStreamOperationType.DropDatabase: writer.WriteString("dropDatabase"); break;
case ChangeStreamOperationType.CreateIndexes: writer.WriteString("createIndexes"); break;
case ChangeStreamOperationType.DropIndexes: writer.WriteString("dropIndexes"); break;
case ChangeStreamOperationType.Modify: writer.WriteString("modify"); break;
case ChangeStreamOperationType.Create: writer.WriteString("create"); break;
case ChangeStreamOperationType.ShardCollection: writer.WriteString("shardCollection"); break;
case ChangeStreamOperationType.RefineCollectionShardKey: writer.WriteString("refineCollectionShardKey"); break;
case ChangeStreamOperationType.ReshardCollection: writer.WriteString("reshardCollection"); break;
default: throw new ArgumentException($"Invalid ChangeStreamOperationType: {value}.", nameof(value));
}
}
Expand Down
19 changes: 18 additions & 1 deletion src/MongoDB.Driver.Core/Core/Operations/ChangeStreamOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ public interface IChangeStreamOperation<TResult> : IReadOperation<IChangeStreamC
/// </value>
BsonDocument ResumeAfter { get; set; }

/// <summary>
/// Gets or sets whether the change stream should show expanded events (MongoDB 6.0 and later).
/// </summary>
/// <value>
/// The value.
/// </value>
bool? ShowExpandedEvents { get; set; }

/// <summary>
/// Gets or sets the start after value.
/// </summary>
Expand Down Expand Up @@ -97,6 +105,7 @@ public class ChangeStreamOperation<TResult> : IChangeStreamOperation<TResult>
private readonly IBsonSerializer<TResult> _resultSerializer;
private BsonDocument _resumeAfter;
private bool _retryRequested;
private bool? _showExpandedEvents;
private BsonDocument _startAfter;
private BsonTimestamp _startAtOperationTime;

Expand Down Expand Up @@ -290,6 +299,13 @@ public bool RetryRequested
set => _retryRequested = value;
}

/// <inheritdoc />
public bool? ShowExpandedEvents
{
get => _showExpandedEvents;
set => _showExpandedEvents = value;
}

/// <inheritdoc />
public BsonDocument StartAfter
{
Expand Down Expand Up @@ -432,7 +448,8 @@ private BsonDocument CreateChangeStreamStage()
{ "fullDocument", () => ToString(_fullDocument), _fullDocument != ChangeStreamFullDocumentOption.Default },
{ "fullDocumentBeforeChange", () => ToString(_fullDocumentBeforeChangeOption), _fullDocumentBeforeChangeOption != ChangeStreamFullDocumentBeforeChangeOption.Default },
{ "allChangesForCluster", true, _collectionNamespace == null && _databaseNamespace == null },
{ "startAfter", _startAfter, _startAfter != null},
{ "showExpandedEvents", _showExpandedEvents, _showExpandedEvents.HasValue },
{ "startAfter", _startAfter, _startAfter != null },
{ "startAtOperationTime", _startAtOperationTime, _startAtOperationTime != null },
{ "resumeAfter", _resumeAfter, _resumeAfter != null }
};
Expand Down
3 changes: 2 additions & 1 deletion src/MongoDB.Driver/ChangeStreamHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private static void SetOperationOptions<TResult>(
ChangeStreamOptions options,
ReadConcern readConcern)
{
options = options ?? new ChangeStreamOptions();
options ??= new ChangeStreamOptions();

operation.BatchSize = options.BatchSize;
operation.Collation = options.Collation;
Expand All @@ -155,6 +155,7 @@ private static void SetOperationOptions<TResult>(
operation.MaxAwaitTime = options.MaxAwaitTime;
operation.ReadConcern = readConcern;
operation.ResumeAfter = options.ResumeAfter;
operation.ShowExpandedEvents = options.ShowExpandedEvents;
operation.StartAfter = options.StartAfter;
operation.StartAtOperationTime = options.StartAtOperationTime;
}
Expand Down
23 changes: 23 additions & 0 deletions src/MongoDB.Driver/ChangeStreamOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class ChangeStreamOptions
private ChangeStreamFullDocumentBeforeChangeOption _fullDocumentBeforeChange = ChangeStreamFullDocumentBeforeChangeOption.Default;
private TimeSpan? _maxAwaitTime;
private BsonDocument _resumeAfter;
private bool? _showExpandedEvents;
private BsonDocument _startAfter;
private BsonTimestamp _startAtOperationTime;

Expand Down Expand Up @@ -120,6 +121,28 @@ public BsonDocument ResumeAfter
set { _resumeAfter = value; }
}

/// <summary>
/// Gets or sets whether the change stream should show expanded events (MongoDB 6.0 and later).
/// Expanded change stream events include:
/// <list type="bullet">
/// <item><description><see cref="ChangeStreamOperationType.Create"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.CreateIndexes"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.DropIndexes"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.Modify"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.RefineCollectionShardKey"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.ReshardCollection"/></description></item>
/// <item><description><see cref="ChangeStreamOperationType.ShardCollection"/></description></item>
/// </list>
/// </summary>
/// <value>
/// The value.
/// </value>
public bool? ShowExpandedEvents
{
get { return _showExpandedEvents; }
set { _showExpandedEvents = value; }
}

/// <summary>
/// Gets or sets the start after.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ public void constructor_should_initialize_instance()
var result = new ChangeStreamDocumentSerializer<BsonDocument>(documentSerializer);

result._documentSerializer().Should().BeSameAs(documentSerializer);
result._memberSerializationInfo().Count.Should().Be(11);
result._memberSerializationInfo().Count.Should().Be(13);
AssertRegisteredMember(result, "ClusterTime", "clusterTime", BsonTimestampSerializer.Instance);
AssertRegisteredMember(result, "CollectionNamespace", "ns", ChangeStreamDocumentCollectionNamespaceSerializer.Instance);
AssertRegisteredMember(result, "CollectionUuid", "ui", GuidSerializer.StandardInstance);
AssertRegisteredMember(result, "DatabaseNamespace", "ns", ChangeStreamDocumentDatabaseNamespaceSerializer.Instance);
AssertRegisteredMember(result, "DocumentKey", "documentKey", BsonDocumentSerializer.Instance);
AssertRegisteredMember(result, "FullDocument", "fullDocument", documentSerializer);
AssertRegisteredMember(result, "FullDocumentBeforeChange", "fullDocumentBeforeChange", documentSerializer);
AssertRegisteredMember(result, "OperationDescription", "operationDescription", BsonDocumentSerializer.Instance);
AssertRegisteredMember(result, "OperationType", "operationType", ChangeStreamOperationTypeSerializer.Instance);
AssertRegisteredMember(result, "RenameTo", "to", ChangeStreamDocumentCollectionNamespaceSerializer.Instance);
AssertRegisteredMember(result, "ResumeToken", "_id", BsonDocumentSerializer.Instance);
Expand Down
46 changes: 46 additions & 0 deletions tests/MongoDB.Driver.Core.Tests/ChangeStreamDocumentTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,29 @@ public void CollectionNamespace_should_allow_extra_elements()
subject.BackingDocument["ns"]["newField2"]["x"].AsInt32.Should().Be(1);
}

[Fact]
public void CollectionUuid_should_return_expected_result()
{
var value = Guid.NewGuid();
var backingDocument = new BsonDocument { { "other", 1 }, { "ui", new BsonBinaryData(value, GuidRepresentation.Standard) } };
var subject = CreateSubject(backingDocument: backingDocument);

var result = subject.CollectionUuid;

result.Should().Be(value);
}

[Fact]
public void CollectionUuid_should_return_null_when_not_present()
{
var backingDocument = new BsonDocument { { "other", 1 } };
var subject = CreateSubject(backingDocument: backingDocument);

var result = subject.CollectionUuid;

result.Should().NotHaveValue();
}

[Fact]
public void DatabaseNamespace_should_return_expected_result()
{
Expand Down Expand Up @@ -289,6 +312,29 @@ public void FullDocument_should_return_null_when_not_present()
result.Should().BeNull();
}

[Fact]
public void OperationDescription_should_return_expected_result()
{
var value = new BsonDocument("x", 1234);
var backingDocument = new BsonDocument { { "other", 1 }, { "operationDescription", value } };
var subject = CreateSubject(backingDocument: backingDocument);

var result = subject.OperationDescription;

result.Should().Be(value);
}

[Fact]
public void OperationDescription_should_return_null_when_not_present()
{
var backingDocument = new BsonDocument { { "other", 1 } };
var subject = CreateSubject(backingDocument: backingDocument);

var result = subject.OperationDescription;

result.Should().BeNull();
}

[Theory]
[InlineData("insert", ChangeStreamOperationType.Insert)]
[InlineData("update", ChangeStreamOperationType.Update)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ public class ChangeStreamOperationTypeSerializerTests
[InlineData("\"rename\"", ChangeStreamOperationType.Rename)]
[InlineData("\"drop\"", ChangeStreamOperationType.Drop)]
[InlineData("\"dropDatabase\"", ChangeStreamOperationType.DropDatabase)]
[InlineData("\"createIndexes\"", ChangeStreamOperationType.CreateIndexes)]
[InlineData("\"dropIndexes\"", ChangeStreamOperationType.DropIndexes)]
[InlineData("\"modify\"", ChangeStreamOperationType.Modify)]
[InlineData("\"create\"", ChangeStreamOperationType.Create)]
[InlineData("\"shardCollection\"", ChangeStreamOperationType.ShardCollection)]
[InlineData("\"refineCollectionShardKey\"", ChangeStreamOperationType.RefineCollectionShardKey)]
[InlineData("\"reshardCollection\"", ChangeStreamOperationType.ReshardCollection)]
public void Deserialize_should_return_expected_result(string json, ChangeStreamOperationType expectedResult)
{
var subject = CreateSubject();
Expand Down Expand Up @@ -72,6 +79,13 @@ public void Deserialize_should_return_negative_one_when_input_is_invalid()
[InlineData(ChangeStreamOperationType.Rename, "\"rename\"")]
[InlineData(ChangeStreamOperationType.Drop, "\"drop\"")]
[InlineData(ChangeStreamOperationType.DropDatabase, "\"dropDatabase\"")]
[InlineData(ChangeStreamOperationType.CreateIndexes, "\"createIndexes\"")]
[InlineData(ChangeStreamOperationType.DropIndexes, "\"dropIndexes\"")]
[InlineData(ChangeStreamOperationType.Modify, "\"modify\"")]
[InlineData(ChangeStreamOperationType.Create, "\"create\"")]
[InlineData(ChangeStreamOperationType.ShardCollection, "\"shardCollection\"")]
[InlineData(ChangeStreamOperationType.RefineCollectionShardKey, "\"refineCollectionShardKey\"")]
[InlineData(ChangeStreamOperationType.ReshardCollection, "\"reshardCollection\"")]
public void Serialize_should_have_expected_result(ChangeStreamOperationType value, string expectedResult)
{
var subject = CreateSubject();
Expand All @@ -90,7 +104,7 @@ public void Serialize_should_have_expected_result(ChangeStreamOperationType valu

[Theory]
[InlineData(-1)]
[InlineData(8)]
[InlineData(15)]
public void Serialize_should_throw_when_value_is_invalid(int valueAsInt)
{
var subject = CreateSubject();
Expand Down
Loading

0 comments on commit fc1602e

Please sign in to comment.