Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions datafusion-postgres-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use datafusion::execution::options::{
ArrowReadOptions, AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_postgres::auth::AuthManager;
use datafusion_postgres::auth::{AuthManager, RoleProviderBridge};
use datafusion_postgres::datafusion_pg_catalog::setup_pg_catalog;
use datafusion_postgres::{serve, ServerOptions};
use env_logger::Env;
Expand Down Expand Up @@ -180,8 +180,9 @@ async fn setup_session_context(
info!("Loaded {table_path} as table {table_name}");
}

// Register pg_catalog
setup_pg_catalog(session_context, "datafusion", auth_manager)?;
// Register pg_catalog using RoleProviderBridge for decoupled auth integration
let role_bridge = RoleProviderBridge::new(auth_manager);
setup_pg_catalog(session_context, "datafusion", role_bridge)?;

Ok(())
}
Expand All @@ -199,7 +200,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let session_config = SessionConfig::new().with_information_schema(true);
let session_context = SessionContext::new_with_config(session_config);

// TODO: remove or replace AuthManager for pg_catalog
// Create AuthManager for pg_catalog role information
let auth_manager = Arc::new(AuthManager::new());
setup_session_context(&session_context, &opts, Arc::clone(&auth_manager)).await?;

Expand Down
66 changes: 60 additions & 6 deletions datafusion-postgres/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,32 @@ use pgwire::api::auth::{AuthSource, LoginInfo, Password};
use pgwire::error::{PgWireError, PgWireResult};
use tokio::sync::RwLock;

use datafusion_pg_catalog::pg_catalog::context::*;
// Re-export Role and related types from pg_catalog context for use by other modules
pub use datafusion_pg_catalog::pg_catalog::context::{
Grant, Permission, ResourceType, Role, RoleConfig, User,
};

/// Trait for providing role information to pg_catalog
/// This decouples AuthManager from PgCatalogContextProvider
#[async_trait]
pub trait RoleProvider: Send + Sync {
/// List all role names
async fn list_roles(&self) -> Vec<String>;
/// Get role details by name
async fn get_role(&self, name: &str) -> Option<Role>;
}

/// Implement RoleProvider for Arc<T> to allow shared ownership
#[async_trait]
impl<T: RoleProvider + ?Sized> RoleProvider for Arc<T> {
async fn list_roles(&self) -> Vec<String> {
(**self).list_roles().await
}

async fn get_role(&self, name: &str) -> Option<Role> {
(**self).get_role(name).await
}
}

/// Authentication manager that handles users and roles
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -446,15 +471,44 @@ impl AuthManager {
}

#[async_trait]
impl PgCatalogContextProvider for AuthManager {
// retrieve all database role names
impl RoleProvider for AuthManager {
async fn list_roles(&self) -> Vec<String> {
let roles = self.roles.read().await;
roles.keys().cloned().collect()
}

async fn get_role(&self, name: &str) -> Option<Role> {
let roles = self.roles.read().await;
roles.get(name).cloned()
}
}

/// Bridge adapter that implements PgCatalogContextProvider using a RoleProvider
/// This allows decoupling of AuthManager from pg_catalog while still providing
/// role information to pg_roles table
#[derive(Debug, Clone)]
pub struct RoleProviderBridge<R: RoleProvider + Clone> {
role_provider: R,
}

impl<R: RoleProvider + Clone> RoleProviderBridge<R> {
pub fn new(role_provider: R) -> Self {
Self { role_provider }
}
}

use datafusion_pg_catalog::pg_catalog::context::PgCatalogContextProvider;

#[async_trait]
impl<R: RoleProvider + Clone + std::fmt::Debug + 'static> PgCatalogContextProvider
for RoleProviderBridge<R>
{
async fn roles(&self) -> Vec<String> {
self.list_roles().await
self.role_provider.list_roles().await
}

// retrieve database role information
async fn role(&self, name: &str) -> Option<Role> {
self.get_role(name).await
self.role_provider.get_role(name).await
}
}

Expand Down
15 changes: 8 additions & 7 deletions datafusion-postgres/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ use pgwire::{
},
};

use crate::{auth::AuthManager, DfSessionService};
use crate::auth::{AuthManager, RoleProviderBridge};
use crate::DfSessionService;

pub fn setup_handlers() -> DfSessionService {
let session_config = SessionConfig::new().with_information_schema(true);
let session_context = SessionContext::new_with_config(session_config);

setup_pg_catalog(
&session_context,
"datafusion",
Arc::new(AuthManager::default()),
)
.expect("Failed to setup sesession context");
// Use RoleProviderBridge to decouple AuthManager from pg_catalog
let auth_manager = Arc::new(AuthManager::default());
let role_bridge = RoleProviderBridge::new(auth_manager);

setup_pg_catalog(&session_context, "datafusion", role_bridge)
.expect("Failed to setup sesession context");

DfSessionService::new(Arc::new(session_context))
}
Expand Down
Loading