Skip to content
14 changes: 14 additions & 0 deletions fakes/WebsocketFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,24 @@

use Evenement\EventEmitter;
use JsonSerializable;
use Ragnarok\Fenrir\Buffer\BufferInterface;
use Ragnarok\Fenrir\Buffer\Passthrough;
use Ragnarok\Fenrir\WebsocketInterface;
use React\Promise\PromiseInterface;

class WebsocketFake extends EventEmitter implements WebsocketInterface
{
public array $openings = [];
public array $closings = [];

public function __construct(public BufferInterface $buffer = new Passthrough())
{
}

public function getBuffer(): BufferInterface
{
return $this->buffer;
}

public function open(string $url): PromiseInterface
{
Expand All @@ -22,6 +34,8 @@ public function open(string $url): PromiseInterface

public function close(int $code, string $reason): void
{
$this->closings[] = [$code, $reason];
$this->buffer->reset();
}

public function send(string $message, bool $useBucket = true): void
Expand Down
15 changes: 15 additions & 0 deletions src/Buffer/BufferInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Ragnarok\Fenrir\Buffer;

use Closure;

interface BufferInterface
{
public function partialMessage(string $partial): void;
public function onCompleteMessage(Closure $handler): void;
public function additionalQueryData(): array;
public function reset(): void;
}
61 changes: 61 additions & 0 deletions src/Buffer/Multilayer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

declare(strict_types=1);

namespace Ragnarok\Fenrir\Buffer;

use Closure;

class Multilayer implements BufferInterface
{
private Closure $handler;

private BufferInterface $first;

/** @param BufferInterface[] $buffers */
public function __construct(public readonly array $buffers)
{
$this->handler = fn () => null;

$keys = array_keys($buffers);

foreach ($keys as $key => $bufferKey) {
$buffer = $this->buffers[$bufferKey];

if (isset($keys[$key + 1])) {
$next = $this->buffers[$keys[$key + 1]];
$buffer->onCompleteMessage(fn (string $message) => $next->partialMessage($message));

continue;
}

$buffer->onCompleteMessage(fn (string $message) => ($this->handler)($message));
}

$this->first = $this->buffers[$keys[0]];
}

public function partialMessage(string $partial): void
{
$this->first->partialMessage($partial);
}

public function onCompleteMessage(Closure $handler): void
{
$this->handler = $handler;
}

public function additionalQueryData(): array
{
return array_merge(
...array_map(fn (BufferInterface $buffer) => $buffer->additionalQueryData(), $this->buffers)
);
}

public function reset(): void
{
foreach ($this->buffers as $buffer) {
$buffer->reset();
}
}
}
36 changes: 36 additions & 0 deletions src/Buffer/Passthrough.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace Ragnarok\Fenrir\Buffer;

use Closure;

class Passthrough implements BufferInterface
{
private Closure $completeHandler;

public function __construct()
{
$this->completeHandler = fn () => null;
}

public function partialMessage(string $partial): void
{
($this->completeHandler)($partial);
}

public function onCompleteMessage(Closure $handler): void
{
$this->completeHandler = $handler;
}

public function additionalQueryData(): array
{
return [];
}

public function reset(): void
{
}
}
66 changes: 66 additions & 0 deletions src/Buffer/ZlibStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

declare(strict_types=1);

namespace Ragnarok\Fenrir\Buffer;

use Closure;
use InflateContext;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

class ZlibStream implements BufferInterface
{
private Closure $completeHandler;

private const SUFFIX = "\x00\x00\xff\xff";

private string $buffer = '';

private InflateContext $inflator;

public function __construct(private readonly LoggerInterface $logger = new NullLogger())
{
$this->completeHandler = fn () => null;
$this->inflator = inflate_init(ZLIB_ENCODING_DEFLATE);
}

public function reset(): void
{
$this->logger->debug('Resetting Buffer');

$this->buffer = '';
$this->inflator = inflate_init(ZLIB_ENCODING_DEFLATE);
}

public function partialMessage(string $partial): void
{
$this->buffer .= $partial;

if (!str_ends_with($partial, self::SUFFIX)) {
return;
}

$message = inflate_add($this->inflator, $this->buffer);
$this->buffer = '';

if ($message === false) {
$this->logger->warning('Failed to decode zlib-stream message(s)');
return;
}

($this->completeHandler)($message);
}

public function onCompleteMessage(Closure $handler): void
{
$this->completeHandler = $handler;
}

public function additionalQueryData(): array
{
return [
'compress' => 'zlib-stream',
];
}
}
7 changes: 5 additions & 2 deletions src/Discord.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Ragnarok\Fenrir\Bitwise\Bitwise;
use Ragnarok\Fenrir\Buffer\BufferInterface;
use Ragnarok\Fenrir\Buffer\Passthrough;
use Ragnarok\Fenrir\Enums\TokenType;
use Ragnarok\Fenrir\Exceptions\Extension\ExtensionNotFoundException;
use Ragnarok\Fenrir\Extension\Extension;
Expand Down Expand Up @@ -48,14 +50,15 @@ public function __construct(
*/
public function withGateway(
Bitwise $intents,
int $timeout = 10
int $timeout = 10,
BufferInterface $buffer = new Passthrough(),
): static {
$this->gateway = new Connection(
$this->loop,
$this->token,
$intents,
$this->mapper,
new Websocket($timeout, $this->logger, [$this->token => '::token::']),
new Websocket($timeout, $this->logger, [$this->token => '::token::'], $buffer),
$this->logger,
);

Expand Down
13 changes: 8 additions & 5 deletions src/Gateway/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use Ragnarok\Fenrir\Constants\WebsocketEvents;
use Ragnarok\Fenrir\DataMapper;
use Ragnarok\Fenrir\EventHandler;
use Ragnarok\Fenrir\Gateway\Events\Meta\MetaEvent;
use Ragnarok\Fenrir\Gateway\Handlers\HeartbeatAcknowledgedEvent;
use Ragnarok\Fenrir\Gateway\Handlers\IdentifyHelloEvent;
use Ragnarok\Fenrir\Gateway\Handlers\IdentifyResumeEvent;
Expand All @@ -28,7 +27,6 @@
use Ragnarok\Fenrir\Gateway\Helpers\PresenceUpdateBuilder;
use Ragnarok\Fenrir\Gateway\Objects\Payload;
use Ragnarok\Fenrir\WebsocketInterface;
use Ratchet\RFC6455\Messaging\MessageInterface;
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;
use React\Promise\PromiseInterface;
Expand Down Expand Up @@ -90,8 +88,8 @@ public function __construct(
) {
$this->events = new EventHandler($mapper);

$this->websocket->on(WebsocketEvents::MESSAGE, function (MessageInterface $message) {
$parsedMessage = json_decode((string) $message, depth: 1024);
$this->websocket->on(WebsocketEvents::MESSAGE, function (string $message) {
$parsedMessage = json_decode($message, depth: 1024);
if ($parsedMessage === null) {
return;
}
Expand Down Expand Up @@ -208,7 +206,12 @@ public function setSequence(int $sequence): void

public function connect(string $url): PromiseInterface
{
$url .= '?' . http_build_query(self::QUERY_DATA);
$queryData = [
...self::QUERY_DATA,
...$this->websocket->getBuffer()->additionalQueryData(),
];

$url .= '?' . http_build_query($queryData);

return $this->websocket->open($url);
}
Expand Down
22 changes: 19 additions & 3 deletions src/Websocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use Evenement\EventEmitter;
use JsonSerializable;
use Psr\Log\LoggerInterface;
use Ragnarok\Fenrir\Buffer\BufferInterface;
use Ragnarok\Fenrir\Buffer\Passthrough;
use Ragnarok\Fenrir\Constants\WebsocketEvents;
use Ragnarok\Fenrir\Exceptions\Websocket\ConnectionFailedException;
use Ragnarok\Fenrir\Exceptions\Websocket\ConnectionNotInitializedException;
Expand All @@ -30,8 +32,12 @@ class Websocket extends EventEmitter implements WebsocketInterface

private Bucket $bucket;

public function __construct(private int $timeout, private LoggerInterface $logger, private array $sendLoggerBlacklist = [])
{
public function __construct(
private int $timeout,
private LoggerInterface $logger,
private array $sendLoggerBlacklist = [],
private readonly BufferInterface $buffer = new Passthrough(),
) {
$this->loop = Loop::get();
$this->socketConnector = new SocketConnector(['timeout' => $timeout]);

Expand Down Expand Up @@ -63,11 +69,15 @@ public function open(string $url): PromiseInterface

$this->logger->info('Client: Connection esablished', ['url' => $url]);

$connection->on('message', function (MessageInterface $message) {
$this->buffer->onCompleteMessage(function (string $message) {
$this->logger->debug('Server: New message', ['message' => $message]);
$this->emit(WebsocketEvents::MESSAGE, [$message]);
});

$connection->on('message', function (MessageInterface $message) {
$this->buffer->partialMessage((string) $message);
});

$connection->on('close', function (int $code, string $reason = '') {
$this->logger->debug('Connection closed', ['code' => $code, 'reason' => $reason]);
$this->emit(WebsocketEvents::CLOSE, [$code, $reason]);
Expand Down Expand Up @@ -98,6 +108,7 @@ public function close(int $code, string $reason): void
);

$this->connection->close($code, $reason);
$this->buffer->reset();

unset($this->connection);
}
Expand Down Expand Up @@ -137,4 +148,9 @@ public function sendAsJson(array|JsonSerializable $item, bool $useBucket): void
{
$this->send(json_encode($item), $useBucket);
}

public function getBuffer(): BufferInterface
{
return $this->buffer;
}
}
2 changes: 2 additions & 0 deletions src/WebsocketInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Evenement\EventEmitterInterface;
use JsonSerializable;
use Ragnarok\Fenrir\Buffer\BufferInterface;
use React\Promise\PromiseInterface;

interface WebsocketInterface extends EventEmitterInterface
Expand All @@ -14,4 +15,5 @@ public function open(string $url): PromiseInterface;
public function close(int $code, string $reason): void;
public function send(string $message, bool $useBucket = true): void;
public function sendAsJson(array|JsonSerializable $item, bool $useBucket): void;
public function getBuffer(): BufferInterface;
}
Loading