Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 64 additions & 17 deletions src/DogStatsd.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace DataDog;

use DataDog\OriginDetection;
use Exception;

/**
* Datadog implementation of StatsD
Expand Down Expand Up @@ -58,6 +59,10 @@ class DogStatsd
* @var string The container ID field, used for origin detection
*/
private $containerID;
/**
* @var (callable(\Throwable, string))|null The closure which is executed when there is a failure flushing metrics.
*/
private $flushFailureHandler = null;

// Telemetry
private $disable_telemetry;
Expand Down Expand Up @@ -85,7 +90,8 @@ class DogStatsd
* metric_prefix,
* disable_telemetry,
* container_id,
* origin_detecion
* origin_detection
* flush_failure_handler
*
* @param array{
* host?: string,
Expand All @@ -98,7 +104,8 @@ class DogStatsd
* metric_prefix?: string,
* disable_telemetry?: bool,
* container_id?: string,
* origin_detection?: bool
* origin_detection?: bool,
* flush_failure_handler?: callable
* } $config
*/
public function __construct(array $config = array())
Expand Down Expand Up @@ -180,6 +187,10 @@ public function __construct(array $config = array())

$containerID = isset($config["container_id"]) ? $config["container_id"] : "";
$this->containerID = $originDetection->getContainerID($containerID, $originDetectionEnabled);

$this->flushFailureHandler = isset($config['flush_failure_handler'])
? $config['flush_failure_handler']
: null;
}

/**
Expand Down Expand Up @@ -643,24 +654,29 @@ public function report($message)
$this->flush($message);
}

/**
* @throws \Exception|\Throwable
*/
public function flush($message)
{
$message .= $this->flushTelemetry();

// Non - Blocking UDP I/O - Use IP Addresses!
if (!is_null($this->socketPath)) {
$socket = socket_create(AF_UNIX, SOCK_DGRAM, 0);
} elseif (filter_var($this->host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) {
$socket = socket_create(AF_INET6, SOCK_DGRAM, SOL_UDP);
} else {
$socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
}
socket_set_nonblock($socket);

if (!is_null($this->socketPath)) {
$res = socket_sendto($socket, $message, strlen($message), 0, $this->socketPath);
} else {
$res = socket_sendto($socket, $message, strlen($message), 0, $this->host, $this->port);
try {
$res = $this->writeToSocket($message);
} catch (\Throwable $e) {
if ($this->flushFailureHandler === null) {
throw $e;
} else {
call_user_func($this->flushFailureHandler, $e, $message);
$res = false;
}
} catch (Exception $e) {
if ($this->flushFailureHandler === null) {
throw $e;
} else {
call_user_func($this->flushFailureHandler, $e, $message);
$res = false;
}
}

if ($res !== false) {
Expand All @@ -671,10 +687,41 @@ public function flush($message)
$this->bytes_dropped += strlen($message);
$this->packets_dropped += 1;
}
}

/**
* @param string $message
* @return false|int
* @throws \Exception|\Throwable
*/
protected function writeToSocket($message)
{
try {
// Non - Blocking UDP I/O - Use IP Addresses!
if (!is_null($this->socketPath)) {
$socket = socket_create(AF_UNIX, SOCK_DGRAM, 0);
} elseif (filter_var($this->host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) {
$socket = socket_create(AF_INET6, SOCK_DGRAM, SOL_UDP);
} else {
$socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
}
socket_set_nonblock($socket);

socket_close($socket);
if (!is_null($this->socketPath)) {
$res = socket_sendto($socket, $message, strlen($message), 0, $this->socketPath);
} else {
$res = socket_sendto($socket, $message, strlen($message), 0, $this->host, $this->port);
}

return $res;
} finally {
if (isset($socket)) {
socket_close($socket);
}
}
}


/**
* Formats $vals array into event for submission to Datadog via UDP
*
Expand Down
9 changes: 9 additions & 0 deletions tests/TestHelpers/SocketSpy.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class SocketSpy
*/
public $returnErrorOnSend = false;

/**
* @var null|callable
*/
public $errorThrownOnSend = null;

/**
* @param int $domain
* @param int $type
Expand Down Expand Up @@ -88,6 +93,10 @@ public function socketSendtoWasCalledWithArgs(
$addr,
$port
) {
if ($this->errorThrownOnSend !== null) {
call_user_func($this->errorThrownOnSend, $socket, $buf, $len, $flags);
}

if ($this->returnErrorOnSend === true) {
return false;
}
Expand Down
26 changes: 25 additions & 1 deletion tests/UnitTests/DogStatsd/SocketsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace DataDog\UnitTests\DogStatsd;

use DateTime;
use ErrorException;
use ReflectionProperty;
use DataDog\DogStatsd;
use DataDog\TestHelpers\SocketSpyTestCase;
Expand Down Expand Up @@ -42,6 +43,7 @@ public function set_up()
$this->oldOriginDetectionEnabled = getenv("DD_ORIGIN_DETECTION_ENABLED");

putenv("DD_EXTERNAL_ENV");
$this->getSocketSpy()->errorThrownOnSend = null;
}

protected function tear_down() {
Expand Down Expand Up @@ -349,7 +351,7 @@ public function testMicrotiming()
public function testGauge()
{
$this->disableOriginDetectionLinux();

$stat = 'some.gauge_metric';
$value = 5;
$sampleRate = 1.0;
Expand Down Expand Up @@ -1500,6 +1502,28 @@ public function testTelemetryNetworkError()
$this->assertSameWithTelemetry('', $this->getSocketSpy()->argsFromSocketSendtoCalls[1][1], "", array("bytes_sent" => 677, "packets_sent" => 1, "metrics" => 0));
}

public function testCustomSocketFailureHandler()
{
$this->disableOriginDetectionLinux();

$errorStore = null;
$dog = new DogStatsd(array("disable_telemetry" => false, "flush_failure_handler" => function ($err) use (&$errorStore) {
$errorStore = $err;
}));

$this->getSocketSpy()->errorThrownOnSend = function () {
trigger_error(
'ErrorException: socket_sendto(): Unable to write to socket [111]: Connection refused',
E_USER_WARNING
);
};
$dog->increment('test');
$this->assertNotNull($errorStore);
$this->getSocketSpy()->errorThrownOnSend = null;
$dog->flush('');
$this->assertSameWithTelemetry('', $this->getSocketSpy()->argsFromSocketSendtoCalls[0][1], "", array("bytes_dropped" => 673, "packets_sent" => 0, "metrics" => 1, 'packets_dropped' => 1));
}

public function testDecimalNormalization()
{
$this->disableOriginDetectionLinux();
Expand Down
Loading