Skip to content

Commit

Permalink
2104 GDK QFE7 (#586)
Browse files Browse the repository at this point in the history
* 2104 GDK QFE7
  • Loading branch information
natiskan authored Mar 29, 2022
1 parent 9eb03db commit 64cb8f2
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ HRESULT Connection::AddSubscription(
// If our connection is active, immediately register with RTA service
if (m_state == XblRealTimeActivityConnectionState::Connected)
{
return SendSubscribeMessage(sub);
return SendSubscribeMessage(sub, std::move(lock));
}
return S_OK;
}
Expand Down Expand Up @@ -293,7 +293,7 @@ HRESULT Connection::RemoveSubscription(
{
// Unregister subscription from RTA service
m_unsubscribeAsyncContexts[sub->m_state->clientId] = std::move(async);
return SendUnsubscribeMessage(sub);
return SendUnsubscribeMessage(sub, std::move(lock));
}
case Subscription::State::ServiceStatus::PendingSubscribe:
{
Expand Down Expand Up @@ -334,9 +334,7 @@ size_t Connection::SubscriptionCount() const noexcept
return m_subs.size();
}

HRESULT Connection::SendSubscribeMessage(
std::shared_ptr<Subscription> sub
) const noexcept
JsonDocument Connection::AssembleSubscribeMessage(std::shared_ptr<Subscription> sub) const noexcept
{
// Payload format [<API_ID>, <SEQUENCE_N>, “<RESOURCE_URI>”]

Expand All @@ -349,14 +347,32 @@ HRESULT Connection::SendSubscribeMessage(
request.PushBack(sub->m_state->clientId, a);
request.PushBack(JsonValue{ sub->m_resourceUri.data(), a }, a);

return request;
}

HRESULT Connection::SendSubscribeMessage(
std::shared_ptr<Subscription> sub,
std::unique_lock<std::mutex>&& lock
) const noexcept
{
JsonDocument request = AssembleSubscribeMessage(sub);

lock.unlock();

return SendAssembledMessage(request);
}

HRESULT Connection::SendAssembledMessage(_In_ const JsonValue& request) const noexcept
{
String requestString{ JsonUtils::SerializeJson(request) };
LOGS_DEBUG << __FUNCTION__ << "[" << this << "]: " << requestString;

return m_websocket->Send(requestString.data());
}

HRESULT Connection::SendUnsubscribeMessage(
std::shared_ptr<Subscription> sub
std::shared_ptr<Subscription> sub,
std::unique_lock<std::mutex>&& lock
) const noexcept
{
// Payload format [<API_ID>, <SEQUENCE_N>, <SUB_ID>]
Expand All @@ -370,6 +386,8 @@ HRESULT Connection::SendUnsubscribeMessage(
request.PushBack(sub->m_state->clientId, a);
request.PushBack(sub->m_state->serviceId, a);

lock.unlock();

String requestString{ JsonUtils::SerializeJson(request) };
LOGS_DEBUG << __FUNCTION__ << "[" << this << "]: " << requestString;

Expand Down Expand Up @@ -403,6 +421,9 @@ void Connection::SubscribeResponseHandler(_In_ const JsonValue& message) noexcep

m_activeSubs[sub->m_state->serviceId] = sub;

AsyncContext<Result<void>> asyncContext{ std::move(m_subscribeAsyncContexts[sub->m_state->clientId]) };
m_subscribeAsyncContexts.erase(sub->m_state->clientId);

switch (sub->m_state->serviceStatus)
{
case Subscription::State::ServiceStatus::Subscribing:
Expand All @@ -414,7 +435,7 @@ void Connection::SubscribeResponseHandler(_In_ const JsonValue& message) noexcep
{
// Client has removed the subscription while subscribe handshake was happening,
// so immediately begin unsubscribing.
SendUnsubscribeMessage(sub);
SendUnsubscribeMessage(sub, std::move(lock));
break;
}
default:
Expand All @@ -425,10 +446,10 @@ void Connection::SubscribeResponseHandler(_In_ const JsonValue& message) noexcep
}
}

AsyncContext<Result<void>> asyncContext{ std::move(m_subscribeAsyncContexts[sub->m_state->clientId]) };
m_subscribeAsyncContexts.erase(sub->m_state->clientId);

lock.unlock();
if (lock)
{
lock.unlock();
}

asyncContext.Complete(ConvertRTAErrorCode(errorCode));
sub->OnSubscribe(data);
Expand Down Expand Up @@ -520,6 +541,9 @@ void Connection::UnsubscribeResponseHandler(_In_ const JsonValue& message) noexc

LOGS_DEBUG << __FUNCTION__ << ": [" << sub->m_state->clientId <<"] ServiceStatus=" << EnumName(sub->m_state->serviceStatus);

AsyncContext<Result<void>> asyncContext{ std::move(m_unsubscribeAsyncContexts[clientId]) };
m_unsubscribeAsyncContexts.erase(clientId);

switch (sub->m_state->serviceStatus)
{
case Subscription::State::ServiceStatus::Unsubscribing:
Expand All @@ -533,7 +557,7 @@ void Connection::UnsubscribeResponseHandler(_In_ const JsonValue& message) noexc
{
// Client has re-added the subscription while unsubscibe handshake was happening,
// so immediately begin subscribing.
SendSubscribeMessage(sub);
SendSubscribeMessage(sub, std::move(lock));
break;
}
default:
Expand All @@ -543,10 +567,10 @@ void Connection::UnsubscribeResponseHandler(_In_ const JsonValue& message) noexc
}
}

AsyncContext<Result<void>> asyncContext{ std::move(m_unsubscribeAsyncContexts[clientId]) };
m_unsubscribeAsyncContexts.erase(clientId);

lock.unlock();
if (lock)
{
lock.unlock();
}

asyncContext.Complete(ConvertRTAErrorCode(errorCode));
}
Expand Down Expand Up @@ -602,10 +626,12 @@ void Connection::ConnectCompleteHandler(WebsocketResult result) noexcept
m_connectTime = std::chrono::system_clock::now();

assert(m_activeSubs.empty());

List<JsonDocument> subMessages{};
for (auto& pair : m_subs)
{
assert(pair.second->m_state->serviceStatus == Subscription::State::ServiceStatus::Inactive);
SendSubscribeMessage(pair.second);
subMessages.push_back(AssembleSubscribeMessage(pair.second));
}

// RTA v2 has a lifetime of 2 hours. After 2 hours RTA service will disconnect the title. On some platforms
Expand All @@ -629,6 +655,13 @@ void Connection::ConnectCompleteHandler(WebsocketResult result) noexcept
},
CONNECTION_TIMEOUT_MS
);

lock.unlock();

for (auto& request : subMessages)
{
SendAssembledMessage(request);
}
}
else
{
Expand Down Expand Up @@ -663,7 +696,11 @@ void Connection::ConnectCompleteHandler(WebsocketResult result) noexcept
);
}

lock.unlock();
if (lock)
{
lock.unlock();
}

m_stateChangedHandler(m_state);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,21 @@ class Connection : public std::enable_shared_from_this<Connection>
ResyncHandler resyncHandler
) noexcept;

JsonDocument AssembleSubscribeMessage(std::shared_ptr<Subscription> sub) const noexcept;

// RTA protocol implementation
HRESULT SendSubscribeMessage(
std::shared_ptr<Subscription> subscription
std::shared_ptr<Subscription> subscription,
std::unique_lock<std::mutex>&& lock
) const noexcept;

HRESULT SendUnsubscribeMessage(
std::shared_ptr<Subscription> subscription
std::shared_ptr<Subscription> subscription,
std::unique_lock<std::mutex>&& lock
) const noexcept;

HRESULT SendAssembledMessage(_In_ const JsonValue& message) const noexcept;

void SubscribeResponseHandler(_In_ const JsonValue& message) noexcept;
void UnsubscribeResponseHandler(_In_ const JsonValue& message) noexcept;
void EventHandler(_In_ const JsonValue& message) const noexcept;
Expand Down
2 changes: 1 addition & 1 deletion Source/Shared/build_version.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
//*********************************************************
#pragma once

#define XBOX_SERVICES_API_VERSION_STRING "2021.04.20220111.6"
#define XBOX_SERVICES_API_VERSION_STRING "2021.04.20220225.7"

0 comments on commit 64cb8f2

Please sign in to comment.