Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ publish = false

[dependencies]
async-trait = "0.1.89"
clap = { version = "4.6", features = ["derive"] }
clap = { version = "4.6", features = ["derive", "env"] }
dirs = "6"
exn = "0.3"
humantime = "2"
Expand Down
3 changes: 3 additions & 0 deletions src/application/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod ports;
pub mod service;
pub mod updater_service;

#[cfg(test)]
mod test_support;
105 changes: 17 additions & 88 deletions src/application/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::domain::models::{

const DEFAULT_SEARCH_LIMIT: u64 = 50;
const MAX_STREAM_SEARCH_LIMIT: u64 = 100;
const MAX_ALL_PAGES_MESSAGES: usize = 10_000;
const DEFAULT_SEARCH_OFFSET: u64 = 0;
const DEFAULT_SEARCH_SORT: &str = "timestamp";

Expand Down Expand Up @@ -89,16 +90,7 @@ impl ApplicationService {
let mut input = input;

if input.all_fields && input.fields.is_empty() {
let config = self
.config_store
.load()
.await
.or_raise(|| CliError::Config("failed to load runtime config".to_string()))?
.ok_or_else(|| {
CliError::Config(
"graylog is not configured, run `graylog-cli auth` first".to_string(),
)
})?;
let config = self.require_config().await?;
let ttl = config.graylog.fields_cache_ttl_seconds;
let cache_key = "fields".to_string();
let now = SystemTime::now()
Expand Down Expand Up @@ -381,16 +373,7 @@ impl ApplicationService {
command: &'static str,
request: MessageSearchRequest,
) -> exn::Result<MessageSearchStatus, CliError> {
let config = self
.config_store
.load()
.await
.or_raise(|| CliError::Config("failed to load runtime config".to_string()))?
.ok_or_else(|| {
CliError::Config(
"graylog is not configured, run `graylog-cli auth` first".to_string(),
)
})?;
let config = self.require_config().await?;
let client = self.graylog_gateway_with_config(config.graylog)?;
let result = client.search_messages(request.clone()).await.or_raise(|| {
CliError::Http(HttpError::Unavailable {
Expand Down Expand Up @@ -419,16 +402,7 @@ impl ApplicationService {
&self,
input: &SearchCommandInput,
) -> exn::Result<MessageSearchStatus, CliError> {
let config = self
.config_store
.load()
.await
.or_raise(|| CliError::Config("failed to load runtime config".to_string()))?
.ok_or_else(|| {
CliError::Config(
"graylog is not configured, run `graylog-cli auth` first".to_string(),
)
})?;
let config = self.require_config().await?;
let client = self.graylog_gateway_with_config(config.graylog)?;
let mut request = self.build_search_request(input.clone(), DEFAULT_SEARCH_LIMIT);
let mut all_messages = Vec::new();
Expand Down Expand Up @@ -456,6 +430,10 @@ impl ApplicationService {
let fetched = result.messages.len();
all_messages.extend(result.messages);

if all_messages.len() >= MAX_ALL_PAGES_MESSAGES {
break;
}

request.offset += fetched as u64;

if fetched == 0 {
Expand Down Expand Up @@ -535,16 +513,7 @@ impl ApplicationService {
command: &'static str,
request: AggregateSearchRequest,
) -> exn::Result<AggregateStatus, CliError> {
let config = self
.config_store
.load()
.await
.or_raise(|| CliError::Config("failed to load runtime config".to_string()))?
.ok_or_else(|| {
CliError::Config(
"graylog is not configured, run `graylog-cli auth` first".to_string(),
)
})?;
let config = self.require_config().await?;
let client = self.graylog_gateway_with_config(config.graylog)?;
let aggregation_type = request.aggregation_type.as_cli_value();
let result = client.search_aggregate(request).await.or_raise(|| {
Expand All @@ -562,18 +531,21 @@ impl ApplicationService {
})
}

async fn graylog_gateway(&self) -> exn::Result<Arc<dyn GraylogGateway>, CliError> {
let config = self
.config_store
async fn require_config(&self) -> exn::Result<Config, CliError> {
self.config_store
.load()
.await
.or_raise(|| CliError::Config("failed to load runtime config".to_string()))?
.ok_or_else(|| {
CliError::Config(
"graylog is not configured, run `graylog-cli auth` first".to_string(),
)
})?;
})
.map_err(Into::into)
}

async fn graylog_gateway(&self) -> exn::Result<Arc<dyn GraylogGateway>, CliError> {
let config = self.require_config().await?;
self.graylog_gateway_with_config(config.graylog)
}

Expand Down Expand Up @@ -639,14 +611,13 @@ fn parse_timestamp_to_millis(ts: &str) -> Option<i64> {
mod tests {
use super::*;

use std::collections::HashMap;
use std::sync::Mutex;

use async_trait::async_trait;
use serde_json::{Map, Value};

use crate::application::ports::cache_store::CacheError;
use crate::application::ports::config_store::ConfigError;
use crate::application::test_support::fakes::FakeCacheStore;
use crate::domain::config::{
DEFAULT_FIELDS_CACHE_TTL_SECONDS, DEFAULT_TIMEOUT_SECONDS, UpdaterConfig,
};
Expand Down Expand Up @@ -701,48 +672,6 @@ mod tests {
}
}

#[derive(Clone, Default)]
struct FakeCacheStore {
storage: Arc<Mutex<HashMap<String, String>>>,
}

impl FakeCacheStore {
fn get(&self, key: &str) -> Option<String> {
self.storage
.lock()
.expect("cache mutex should not be poisoned")
.get(key)
.cloned()
}

fn insert(&self, key: &str, value: String) {
self.storage
.lock()
.expect("cache mutex should not be poisoned")
.insert(key.to_string(), value);
}
}

#[async_trait]
impl CacheStore for FakeCacheStore {
async fn get_serialized(&self, key: &str) -> exn::Result<Option<String>, CacheError> {
Ok(self
.storage
.lock()
.expect("cache mutex should not be poisoned")
.get(key)
.cloned())
}

async fn save_serialized(&self, key: String, data: String) -> exn::Result<(), CacheError> {
self.storage
.lock()
.expect("cache mutex should not be poisoned")
.insert(key, data);
Ok(())
}
}

#[derive(Clone)]
struct FakeGraylogGateway {
search_results: Arc<Mutex<Vec<MessageSearchResult>>>,
Expand Down
51 changes: 51 additions & 0 deletions src/application/test_support.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#[cfg(test)]
pub(crate) mod fakes {
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;

use crate::application::ports::cache_store::{CacheError, CacheStore};

#[derive(Clone, Default)]
pub(crate) struct FakeCacheStore {
storage: Arc<Mutex<HashMap<String, String>>>,
}

impl FakeCacheStore {
pub(crate) fn get(&self, key: &str) -> Option<String> {
self.storage
.lock()
.expect("cache mutex should not be poisoned")
.get(key)
.cloned()
}

pub(crate) fn insert(&self, key: &str, value: String) {
self.storage
.lock()
.expect("cache mutex should not be poisoned")
.insert(key.to_string(), value);
}
}

#[async_trait]
impl CacheStore for FakeCacheStore {
async fn get_serialized(&self, key: &str) -> exn::Result<Option<String>, CacheError> {
Ok(self
.storage
.lock()
.expect("cache mutex should not be poisoned")
.get(key)
.cloned())
}

async fn save_serialized(&self, key: String, data: String) -> exn::Result<(), CacheError> {
self.storage
.lock()
.expect("cache mutex should not be poisoned")
.insert(key, data);
Ok(())
}
}
}
28 changes: 1 addition & 27 deletions src/application/updater_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,40 +338,14 @@ fn unix_now() -> u64 {
mod tests {
use super::*;

use std::collections::HashMap;
use std::path::Path;
use std::sync::Mutex;

use async_trait::async_trait;
use tempfile::TempDir;

use crate::application::ports::cache_store::CacheError;
use crate::application::ports::updater::ReleaseInfo;

#[derive(Default)]
struct FakeCacheStore {
entries: Mutex<HashMap<String, String>>,
}

#[async_trait]
impl CacheStore for FakeCacheStore {
async fn get_serialized(&self, key: &str) -> exn::Result<Option<String>, CacheError> {
Ok(self
.entries
.lock()
.expect("cache mutex should not be poisoned")
.get(key)
.cloned())
}

async fn save_serialized(&self, key: String, data: String) -> exn::Result<(), CacheError> {
self.entries
.lock()
.expect("cache mutex should not be poisoned")
.insert(key, data);
Ok(())
}
}
use crate::application::test_support::fakes::FakeCacheStore;

struct FakeUpdater {
release: Mutex<ReleaseInfo>,
Expand Down
11 changes: 10 additions & 1 deletion src/infrastructure/config_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,16 @@ impl CacheStore for FileConfigStore {
std::fs::create_dir_all(parent)
.map_err(|error| CacheError::OperationFailure(error.to_string()))?;
std::fs::write(&cache_path, data)
.map_err(|error| CacheError::OperationFailure(error.to_string()))
.map_err(|error| CacheError::OperationFailure(error.to_string()))?;

#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ =
std::fs::set_permissions(&cache_path, std::fs::Permissions::from_mode(0o600));
}

Ok::<(), CacheError>(())
})
.await
.map_err(|error| CacheError::StoreUnavailable(format!("failed to write cache: {error}")))?
Expand Down
11 changes: 9 additions & 2 deletions src/presentation/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,13 @@ pub struct AuthArgs {
#[arg(short = 'u', long = "url", required = true)]
pub url: String,
/// Graylog access token.
#[arg(short = 't', long = "token", required = true)]
#[arg(short = 't', long = "token", required = true, env = "GRAYLOG_TOKEN")]
pub token: String,
}

#[derive(Debug, Args)]
pub struct SearchArgs {
#[arg(help = "Lucene search query")]
pub query: String,
#[command(flatten)]
pub timerange: TimerangeArgs,
Expand Down Expand Up @@ -131,10 +132,11 @@ impl SearchArgs {

#[derive(Debug, Args)]
pub struct AggregateArgs {
#[arg(help = "Lucene search query")]
pub query: String,
#[arg(long = "aggregation-type", value_enum)]
pub aggregation_type: AggregationTypeArg,
#[arg(long = "field")]
#[arg(long = "field", help = "Field to aggregate on")]
pub field: String,
#[arg(long = "size", value_parser = clap::value_parser!(u64).range(1..=100))]
pub size: Option<u64>,
Expand Down Expand Up @@ -230,17 +232,21 @@ impl StreamsCommands {

#[derive(Debug, Args)]
pub struct StreamIdArgs {
#[arg(help = "Graylog stream ID")]
pub stream_id: String,
}

#[derive(Debug, Args)]
pub struct StreamNameArgs {
#[arg(help = "Stream name to search for")]
pub name: String,
}

#[derive(Debug, Args)]
pub struct StreamSearchArgs {
#[arg(help = "Graylog stream ID")]
pub stream_id: String,
#[arg(help = "Lucene search query")]
pub query: String,
#[command(flatten)]
pub timerange: TimerangeArgs,
Expand Down Expand Up @@ -270,6 +276,7 @@ impl StreamSearchArgs {

#[derive(Debug, Args)]
pub struct StreamIdTimerangeArgs {
#[arg(help = "Graylog stream ID")]
pub stream_id: String,
#[command(flatten)]
pub timerange: TimerangeArgs,
Expand Down
Loading