Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pgdog-config/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use super::rewrite::Rewrite;
use super::sharding::{ManualQuery, OmnishardedTables, ShardedMapping, ShardedTable};
use super::users::{Admin, Plugin, Users};

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
pub struct ConfigAndUsers {
/// pgdog.toml
pub config: Config,
Expand Down
1 change: 1 addition & 0 deletions pgdog-postgres-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ bytes = "*"
thiserror = "*"
uuid = "*"
serde = { version = "*", features = ["derive"]}
schemars.workspace = true
pgdog-vector = { path = "../pgdog-vector" }
rust_decimal = { version = "1.36", features = ["db-postgres"] }
3 changes: 2 additions & 1 deletion pgdog-postgres-types/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use super::*;
use super::interval::bigint;
use bytes::{Buf, Bytes};
use chrono::{Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

// PostgreSQL epoch is 2000-01-01 00:00:00 UTC, which is 946684800 seconds after Unix epoch
const POSTGRES_EPOCH_MICROS: i64 = 946684800000000; // microseconds

#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, Hash, Serialize, Deserialize)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, Hash, Serialize, Deserialize, JsonSchema)]
pub struct Timestamp {
pub year: i64,
pub month: i8,
Expand Down
14 changes: 13 additions & 1 deletion pgdog-postgres-types/src/timestamptz.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
use std::ops::{Deref, DerefMut};

use bytes::Bytes;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use super::*;

#[derive(
Debug, Copy, Clone, PartialEq, Ord, PartialOrd, Eq, Default, Hash, Serialize, Deserialize,
Debug,
Copy,
Clone,
PartialEq,
Ord,
PartialOrd,
Eq,
Default,
Hash,
Serialize,
Deserialize,
JsonSchema,
)]
pub struct TimestampTz {
timestamp: Timestamp,
Expand Down
1 change: 1 addition & 0 deletions pgdog-stats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ pgdog-config = { path = "../pgdog-config" }
pgdog-postgres-types = { path = "../pgdog-postgres-types" }
bytes = "*"
indexmap = { version = "*", features = ["serde"] }
schemars.workspace = true
7 changes: 4 additions & 3 deletions pgdog-stats/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
};

use pgdog_config::{PoolerMode, PreparedStatements, pooling::ConnectionRecovery};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::{LsnStats, ReplicaLag};
Expand All @@ -12,7 +13,7 @@ use crate::{LsnStats, ReplicaLag};
///
/// These are updated after each connection check-in.
///
#[derive(Debug, Clone, Default, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Copy, Serialize, Deserialize, JsonSchema)]
pub struct Counts {
/// Number of committed transactions.
pub xact_count: usize,
Expand Down Expand Up @@ -170,7 +171,7 @@ impl Div<usize> for Counts {
}
}

#[derive(Debug, Clone, Default, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Copy, Serialize, Deserialize, JsonSchema)]
pub struct Stats {
// Total counts.
pub counts: Counts,
Expand Down Expand Up @@ -228,7 +229,7 @@ impl Stats {

/// Real-time state of each connection pool.
/// Pool state.
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, JsonSchema)]
pub struct State {
/// Number of connections checked out.
pub checked_out: usize,
Expand Down
27 changes: 24 additions & 3 deletions pgdog-stats/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,22 @@ use std::time::{Duration, SystemTime};
use bytes::Bytes;
use pgdog_postgres_types::Error;
use pgdog_postgres_types::{Format, FromDataType, TimestampTz};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

#[derive(
Debug, Clone, Default, Copy, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash,
Debug,
Clone,
Default,
Copy,
Eq,
PartialEq,
Ord,
PartialOrd,
Serialize,
Deserialize,
Hash,
JsonSchema,
)]
pub struct Lsn {
pub high: i64,
Expand Down Expand Up @@ -72,7 +84,7 @@ impl Display for Lsn {
}

/// LSN information.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema)]
pub struct LsnStats {
/// pg_is_in_recovery()
pub replica: bool,
Expand All @@ -83,11 +95,20 @@ pub struct LsnStats {
/// Server timestamp.
pub timestamp: TimestampTz,
/// Our timestamp.
#[schemars(with = "SystemTimeRepr")]
pub fetched: SystemTime,
/// Running on Aurora.
pub aurora: bool,
}

/// Schema-only mirror of `std::time::SystemTime`'s default serde representation.
#[derive(JsonSchema)]
#[allow(dead_code)]
struct SystemTimeRepr {
secs_since_epoch: u64,
nanos_since_epoch: u32,
}

impl LsnStats {
/// Stats contain real data.
pub fn valid(&self) -> bool {
Expand All @@ -108,7 +129,7 @@ impl Default for LsnStats {
}
}

#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, JsonSchema)]
pub struct ReplicaLag {
pub duration: Duration,
pub bytes: i64,
Expand Down
5 changes: 5 additions & 0 deletions pgdog/src/backend/pool/waiting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ impl Waiting {
if !guard.online {
return Err(Error::Offline);
}
if request.read {
guard.stats.counts.reads += 1;
} else {
guard.stats.counts.writes += 1;
}
guard.waiting.push_back(Waiter { request, tx });
guard.full()
};
Expand Down
Loading