Skip to content
13 changes: 9 additions & 4 deletions com.unity.netcode.gameobjects/Runtime/Core/NetworkManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,15 @@ public void NetworkUpdate(NetworkUpdateStage updateStage)

// This should be invoked just prior to the MessageManager processes its outbound queue.
SceneManager.CheckForAndSendNetworkObjectSceneChanged();

// Process outbound messages
MessageManager.ProcessSendQueues();
#if UNIFIED_NETCODE
if (!NetworkConfig.Prefabs.HasGhostPrefabs)
{
#endif
// Process outbound messages
MessageManager.ProcessSendQueues();
#if UNIFIED_NETCODE
}
#endif
Comment on lines +474 to +482
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#if UNIFIED_NETCODE
if (!NetworkConfig.Prefabs.HasGhostPrefabs)
{
#endif
// Process outbound messages
MessageManager.ProcessSendQueues();
#if UNIFIED_NETCODE
}
#endif
#if !UNIFIED_NETCODE
// Process outbound messages
MessageManager.ProcessSendQueues();
#endif

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps leave an XML comment with a cref to UnifiedNetcodeUpdateSystem.Update saying something about messages being processed prior to when N4E sends them?


// Metrics update needs to be driven by NetworkConnectionManager's update to assure metrics are dispatched after the send queue is processed.
MetricsManager.UpdateMetrics();
Expand Down Expand Up @@ -1354,7 +1360,6 @@ private bool CanStart(StartType type)
/// The world instance assigned to this NetworkManager instance.
/// </summary>
public NetcodeWorld NetcodeWorld { get; internal set; }

internal void InitializeNetcodeWorld()
{
if (NetcodeWorld != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,37 +43,41 @@ public static NativeArray<byte> ToNativeArray(in FixedBytes1280 data)
}
}

internal struct TransportRpcData : IBufferElementData
{
public FixedBytes1280 Buffer;
}

[BurstCompile]
internal struct TransportRpc : IOutOfBandRpcCommand, IRpcCommandSerializer<TransportRpc>
{
public FixedBytes1280 Buffer;
public ulong Order;
public TransportRpcData Value;

public unsafe void Serialize(ref DataStreamWriter writer, in RpcSerializerState state, in TransportRpc data)
{
writer.WriteULong(data.Order);
writer.WriteInt(data.Buffer.Length);
var span = new Span<byte>(FixedBytes1280.GetUnsafePtr(data.Buffer), data.Buffer.Length);
writer.WriteInt(data.Value.Buffer.Length);
var span = new Span<byte>(FixedBytes1280.GetUnsafePtr(data.Value.Buffer), data.Value.Buffer.Length);
writer.WriteBytes(span);
}

public unsafe void Deserialize(ref DataStreamReader reader, in RpcDeserializerState state, ref TransportRpc data)
{
data.Order = reader.ReadULong();
var length = reader.ReadInt();
data.Buffer = new FixedBytes1280
data.Value.Buffer = new FixedBytes1280
{
Length = length
};

var span = new Span<byte>(FixedBytes1280.GetUnsafePtr(data.Buffer), length);
var span = new Span<byte>(FixedBytes1280.GetUnsafePtr(data.Value.Buffer), length);
reader.ReadBytes(span);
}

[BurstCompile(DisableDirectCall = true)]
private static void InvokeExecute(ref RpcExecutor.Parameters parameters)
{
RpcExecutor.ExecuteCreateRequestComponent<TransportRpc, TransportRpc>(ref parameters);
var element = new TransportRpc();
element.Deserialize(ref parameters.Reader, parameters.DeserializerState, ref element);
parameters.CommandBuffer.AppendToBuffer(parameters.JobIndex, parameters.Connection, element.Value);
}

private static readonly PortableFunctionPointer<RpcExecutor.ExecuteDelegate> k_InvokeExecuteFunctionPointer = new PortableFunctionPointer<RpcExecutor.ExecuteDelegate>(InvokeExecute);
Expand Down Expand Up @@ -115,33 +119,57 @@ public void OnUpdate(ref SystemState state)
}
}

[WorldSystemFilter(WorldSystemFilterFlags.ServerSimulation | WorldSystemFilterFlags.ClientSimulation | WorldSystemFilterFlags.ThinClientSimulation)]
[UpdateInGroup(typeof(SimulationSystemGroup), OrderLast = true)]
[UpdateBefore(typeof(RpcSystem))]
internal partial class UnifiedNetcodeUpdateSystem : SystemBase
{
public void OnCreate(ref SystemState state)
{
state.RequireForUpdate<RpcCollection>();
state.RequireForUpdate<NetworkId>();
}

public UnifiedNetcodeTransport Transport;
public NetworkManager NetworkManager;

public List<Connection> DisconnectQueue = new List<Connection>();

public void Disconnect(Connection connection)
{
DisconnectQueue.Add(connection);
}

public void SendRpc(TransportRpc rpc, Entity connectionEntity)
{
var rpcQueue = SystemAPI.GetSingleton<RpcCollection>().GetRpcQueue<TransportRpc, TransportRpc>();
var ghostInstance = GetComponentLookup<GhostInstance>();
var rpcDataStreamBuffer = EntityManager.GetBuffer<OutgoingOutOfBandRpcDataStreamBuffer>(connectionEntity);
rpcQueue.Schedule(rpcDataStreamBuffer, ghostInstance, rpc);
}

protected override void OnUpdate()
{
NetworkManager.MessageManager.ProcessSendQueues();

using var commandBuffer = new EntityCommandBuffer(Allocator.Temp);
foreach (var (request, rpc, entity) in SystemAPI.Query<RefRO<ReceiveRpcCommandRequest>, RefRO<TransportRpc>>().WithEntityAccess())
foreach(var (networkId, _, entity) in SystemAPI.Query<RefRO<NetworkId>, RefRO<NetworkStreamConnection>>().WithEntityAccess())
{
var connectionId = SystemAPI.GetComponent<NetworkId>(request.ValueRO.SourceConnection).Value;

var buffer = rpc.ValueRO.Buffer;
try
{
Transport.DispatchMessage(connectionId, buffer, rpc.ValueRO.Order);
}
finally
var connectionId = networkId.ValueRO.Value;
DynamicBuffer<TransportRpcData> rpcs = EntityManager.GetBuffer<TransportRpcData>(entity);
foreach (var rpc in rpcs)
{
commandBuffer.DestroyEntity(entity);
var buffer = rpc.Buffer;
try
{
Transport.DispatchMessage(connectionId, buffer);
}
catch(Exception e)
{
Debug.LogException(e);
}
}
rpcs.Clear();
}

foreach (var connection in DisconnectQueue)
Expand Down Expand Up @@ -171,34 +199,15 @@ private class ConnectionInfo
public BatchedSendQueue SendQueue;
public BatchedReceiveQueue ReceiveQueue;
public Connection Connection;
public ulong LastSent;
public ulong LastReceived;
public Dictionary<ulong, FixedBytes1280> DeferredMessages;
}

private Dictionary<int, ConnectionInfo> m_Connections;

internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer, ulong order)
internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer)
{
var connectionInfo = m_Connections[connectionId];

if (order <= connectionInfo.LastReceived)
{
Debug.LogWarning("Received duplicate message, ignoring.");
return;
}

if (order != connectionInfo.LastReceived + 1)
{
if (connectionInfo.DeferredMessages == null)
{
connectionInfo.DeferredMessages = new Dictionary<ulong, FixedBytes1280>();
}

connectionInfo.DeferredMessages[order] = buffer;
return;
}

using var arr = FixedBytes1280.ToNativeArray(buffer);
var reader = new DataStreamReader(arr);
if (connectionInfo.ReceiveQueue == null)
Expand All @@ -209,20 +218,7 @@ internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer, ulong
{
connectionInfo.ReceiveQueue.PushReader(reader);
}

connectionInfo.LastReceived = order;
if (connectionInfo.DeferredMessages != null)
{
var next = order + 1;
while (connectionInfo.DeferredMessages.Remove(next, out var nextBuffer))
{
reader = new DataStreamReader(FixedBytes1280.ToNativeArray(nextBuffer));
connectionInfo.ReceiveQueue.PushReader(reader);
connectionInfo.LastReceived = next;
++next;
}
}


var message = connectionInfo.ReceiveQueue.PopMessage();
while (message.Count != 0)
{
Expand All @@ -243,20 +239,15 @@ public override unsafe void Send(ulong clientId, ArraySegment<byte> payload, Net

while (!connectionInfo.SendQueue.IsEmpty)
{
var rpc = new TransportRpc
{
Buffer = new FixedBytes1280(),
};
var rpc = new TransportRpc();

var writer = new DataStreamWriter(FixedBytes1280.GetUnsafePtr(rpc.Buffer), k_MaxPacketSize);
var writer = new DataStreamWriter(FixedBytes1280.GetUnsafePtr(rpc.Value.Buffer), k_MaxPacketSize);

var amount = connectionInfo.SendQueue.FillWriterWithBytes(ref writer, k_MaxPacketSize);
rpc.Buffer.Length = amount;
rpc.Order = ++connectionInfo.LastSent;

var req = m_NetworkManager.NetcodeWorld.EntityManager.CreateEntity(ComponentType.ReadWrite<SendRpcCommandRequest>(), ComponentType.ReadWrite<TransportRpc>());
m_NetworkManager.NetcodeWorld.EntityManager.SetComponentData(req, new SendRpcCommandRequest{TargetConnection = connectionInfo.Connection.ConnectionEntity});
m_NetworkManager.NetcodeWorld.EntityManager.SetComponentData(req, rpc);
rpc.Value.Buffer.Length = amount;

var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
updateSystem.SendRpc(rpc, connectionInfo.Connection.ConnectionEntity);

connectionInfo.SendQueue.Consume(amount);
}
Expand All @@ -280,6 +271,8 @@ private void OnClientConnectedToServer(Connection connection, NetCodeConnectionE
};
m_ServerClientId = connection.NetworkId.Value;
InvokeOnTransportEvent(NetworkEvent.Connect, (ulong)connection.NetworkId.Value, default, m_RealTimeProvider.RealTimeSinceStartup);
var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
updateSystem.EntityManager.AddBuffer<TransportRpcData>(connection.ConnectionEntity);
}

private void OnServerNewClientConnection(Connection connection, NetCodeConnectionEvent connectionEvent)
Expand All @@ -291,6 +284,8 @@ private void OnServerNewClientConnection(Connection connection, NetCodeConnectio
Connection = connection
}; ;
InvokeOnTransportEvent(NetworkEvent.Connect, (ulong)connection.NetworkId.Value, default, m_RealTimeProvider.RealTimeSinceStartup);
var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
updateSystem.EntityManager.AddBuffer<TransportRpcData>(connection.ConnectionEntity);
}

private const string k_InvalidRpcMessage = "An invalid RPC was received";
Expand Down Expand Up @@ -403,6 +398,7 @@ public override bool StartClient()
m_NetworkManager.NetcodeWorld.OnConnectionEvent += OnClientConnectionEvent;
var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
updateSystem.Transport = this;
updateSystem.NetworkManager = m_NetworkManager;
return true;
}

Expand All @@ -416,6 +412,7 @@ public override bool StartServer()
m_NetworkManager.NetcodeWorld.OnConnectionEvent += OnServerConnectionEvent;
var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
updateSystem.Transport = this;
updateSystem.NetworkManager = m_NetworkManager;
return true;
}

Expand Down