Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 19 additions & 80 deletions mgmt/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ use std::sync::Arc;
use tonic::{Request, Response, Status};
use tracing::{debug, error};

use crate::processor::proc::{ConfigChannelRequest, ConfigRequest, ConfigResponse};
use crate::processor::mgmt_client::ConfigClient;
use config::converters::grpc::{
convert_dataplane_status_to_grpc, convert_gateway_config_from_grpc_with_defaults,
};
use config::internal::status::DataplaneStatus;
use config::{GenId, GwConfig};
use tokio::sync::mpsc::Sender;

// Import proto-generated types
use gateway_config::{
Expand Down Expand Up @@ -113,116 +112,56 @@ impl ConfigService for ConfigServiceImpl {

/// Basic configuration manager implementation
pub struct BasicConfigManager {
channel_tx: Sender<ConfigChannelRequest>,
client: ConfigClient,
}

impl BasicConfigManager {
pub fn new(channel_tx: Sender<ConfigChannelRequest>) -> Self {
Self { channel_tx }
pub fn new(client: ConfigClient) -> Self {
Self { client }
}
}

#[async_trait]
impl ConfigManager for BasicConfigManager {
async fn get_current_config(&self) -> Result<GatewayConfig, String> {
debug!("Received request to get current config");

// build a request to the config processor, send it and get the response
let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetCurrentConfig);
self.channel_tx
.send(req)
.await
.map_err(|_| "Failure relaying request".to_string())?;
let response = rx
let config = self
.client
.get_current_config()
.await
.map_err(|_| "Failure receiving from config processor".to_string())?;
match response {
ConfigResponse::GetCurrentConfig(opt_config) => {
if let Some(config) = *opt_config {
gateway_config::GatewayConfig::try_from(&config.external)
} else {
Err("No config is currently applied".to_string())
}
}
_ => unreachable!(),
}
.map_err(|e| e.to_string())?;
gateway_config::GatewayConfig::try_from(&config.external)
}

async fn get_generation(&self) -> Result<GenId, String> {
debug!("Received request to get current config generation");

// build a request to the config processor, send it and get the response
let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetGeneration);
self.channel_tx
.send(req)
.await
.map_err(|_| "Failure relaying request".to_string())?;
let response = rx
self.client
.get_generation()
.await
.map_err(|_| "Failure receiving from config processor".to_string())?;
match response {
ConfigResponse::GetGeneration(opt_genid) => {
opt_genid.ok_or_else(|| "No config is currently applied".to_string())
}
_ => unreachable!(),
}
.map_err(|e| e.to_string())
}

async fn apply_config(&self, grpc_config: GatewayConfig) -> Result<(), String> {
debug!("Received request to apply new config");

// Convert config from gRPC to native external model
let external_config = convert_gateway_config_from_grpc_with_defaults(&grpc_config)
.map_err(|e| {
error!("Failed to process apply config: {e}");
error!("Failed to parse config: {e}");
e
})?;

// Create a new GwConfig with this ExternalConfig
let gw_config = Box::new(GwConfig::new(external_config));

// build a request to the config processor, send it and get the response
let (req, rx) = ConfigChannelRequest::new(ConfigRequest::ApplyConfig(gw_config));
self.channel_tx
.send(req)
.await
.map_err(|_| "Failure relaying request".to_string())?;
let response = rx
self.client
.apply_config(GwConfig::new(external_config))
.await
.map_err(|_| "Failure receiving from config processor".to_string())?;
match response {
ConfigResponse::ApplyConfig(result) => {
result.map_err(|e| format!("Failed to apply config: {e}"))
}
_ => unreachable!(),
}
.map_err(|e| e.to_string())
}

async fn get_dataplane_status(&self) -> Result<DataplaneStatus, String> {
debug!("Received request to get dataplane status");

// build a request to the config processor, send it and get the response
let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetDataplaneStatus);
self.channel_tx
.send(req)
.await
.map_err(|_| "Failure relaying request".to_string())?;
let response = rx
.await
.map_err(|_| "Failure receiving from config processor".to_string())?;

match response {
ConfigResponse::GetDataplaneStatus(status) => Ok(*status),
_ => unreachable!(),
}
self.client.get_status().await.map_err(|e| e.to_string())
}
}

/// Function to create the gRPC service
pub fn create_config_service(
channel_tx: Sender<ConfigChannelRequest>,
) -> ConfigServiceServer<ConfigServiceImpl> {
let config_manager = Arc::new(BasicConfigManager::new(channel_tx));
pub fn create_config_service(client: ConfigClient) -> ConfigServiceServer<ConfigServiceImpl> {
let config_manager = Arc::new(BasicConfigManager::new(client));
let service = ConfigServiceImpl::new(config_manager);
ConfigServiceServer::new(service)
}
Loading
Loading