Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ async fn json_opener() -> Result<()> {
projected,
FileCompressionType::UNCOMPRESSED,
Arc::new(object_store),
true,
);

let scan_config = FileScanConfigBuilder::new(
Expand Down
19 changes: 19 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3065,6 +3065,25 @@ config_namespace! {
/// If not specified, the default level for the compression algorithm is used.
pub compression_level: Option<u32>, default = None
pub schema_infer_max_rec: Option<usize>, default = None
/// The JSON format to use when reading files.
///
/// When `true` (default), expects newline-delimited JSON (NDJSON):
/// ```text
/// {"key1": 1, "key2": "val"}
/// {"key1": 2, "key2": "vals"}
/// ```
///
/// When `false`, expects JSON array format:
/// ```text
/// [
/// {"key1": 1, "key2": "val"},
/// {"key1": 2, "key2": "vals"}
/// ]
/// ```
///
/// Note: JSON array format requires loading the entire file into memory.
/// For large files, newline-delimited format is recommended.
pub newline_delimited: bool, default = true
}
}

Expand Down
246 changes: 243 additions & 3 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,54 @@ mod tests {
use datafusion_common::stats::Precision;

use datafusion_common::Result;
use datafusion_datasource::file_compression_type::FileCompressionType;
use futures::StreamExt;
use insta::assert_snapshot;
use object_store::local::LocalFileSystem;
use regex::Regex;
use rstest::rstest;

// ==================== Test Helpers ====================

/// Create a temporary JSON file and return (TempDir, path)
fn create_temp_json(content: &str) -> (tempfile::TempDir, String) {
let tmp_dir = tempfile::TempDir::new().unwrap();
let path = format!("{}/test.json", tmp_dir.path().to_string_lossy());
std::fs::write(&path, content).unwrap();
(tmp_dir, path)
}

/// Infer schema from JSON array format file
async fn infer_json_array_schema(
content: &str,
) -> Result<arrow::datatypes::SchemaRef> {
let (_tmp_dir, path) = create_temp_json(content);
let session = SessionContext::new();
let ctx = session.state();
let store = Arc::new(LocalFileSystem::new()) as _;
let format = JsonFormat::default().with_newline_delimited(false);
format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
.await
}

/// Register a JSON array table and run a query
async fn query_json_array(content: &str, query: &str) -> Result<Vec<RecordBatch>> {
let (_tmp_dir, path) = create_temp_json(content);
let ctx = SessionContext::new();
let options = NdJsonReadOptions::default().newline_delimited(false);
ctx.register_json("test_table", &path, options).await?;
ctx.sql(query).await?.collect().await
}

/// Register a JSON array table and run a query, return formatted string
async fn query_json_array_str(content: &str, query: &str) -> Result<String> {
let result = query_json_array(content, query).await?;
Ok(batches_to_string(&result))
}

// ==================== Existing Tests ====================

#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
Expand Down Expand Up @@ -314,7 +356,6 @@ mod tests {
.digest(r#"{ "c1": 11, "c2": 12, "c3": 13, "c4": 14, "c5": 15 }"#.into());

let mut all_batches = RecordBatch::new_empty(schema.clone());
// We get RequiresMoreData after 2 batches because of how json::Decoder works
for _ in 0..2 {
let output = deserializer.next()?;
let DeserializerOutput::RecordBatch(batch) = output else {
Expand Down Expand Up @@ -358,7 +399,6 @@ mod tests {
let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?;
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
// Expected the file to exist and be empty
assert!(std::path::Path::new(&path).exists());
let metadata = std::fs::metadata(&path)?;
assert_eq!(metadata.len(), 0);
Expand All @@ -385,10 +425,210 @@ mod tests {
let df = ctx.read_batch(empty_batch.clone())?;
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
// Expected the file to exist and be empty
assert!(std::path::Path::new(&path).exists());
let metadata = std::fs::metadata(&path)?;
assert_eq!(metadata.len(), 0);
Ok(())
}

// ==================== JSON Array Format Tests ====================

#[tokio::test]
async fn test_json_array_schema_inference() -> Result<()> {
let schema = infer_json_array_schema(
r#"[{"a": 1, "b": 2.0, "c": true}, {"a": 2, "b": 3.5, "c": false}]"#,
)
.await?;

let fields: Vec<_> = schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect();
assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
Ok(())
}

#[tokio::test]
async fn test_json_array_empty() -> Result<()> {
let schema = infer_json_array_schema("[]").await?;
assert_eq!(schema.fields().len(), 0);
Ok(())
}

#[tokio::test]
async fn test_json_array_nested_struct() -> Result<()> {
let schema = infer_json_array_schema(
r#"[{"id": 1, "info": {"name": "Alice", "age": 30}}]"#,
)
.await?;

let info_field = schema.field_with_name("info").unwrap();
assert!(matches!(info_field.data_type(), DataType::Struct(_)));
Ok(())
}

#[tokio::test]
async fn test_json_array_list_type() -> Result<()> {
let schema =
infer_json_array_schema(r#"[{"id": 1, "tags": ["a", "b", "c"]}]"#).await?;

let tags_field = schema.field_with_name("tags").unwrap();
assert!(matches!(tags_field.data_type(), DataType::List(_)));
Ok(())
}

#[tokio::test]
async fn test_json_array_basic_query() -> Result<()> {
let result = query_json_array_str(
r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}, {"a": 3, "b": "test"}]"#,
"SELECT a, b FROM test_table ORDER BY a",
)
.await?;

assert_snapshot!(result, @r"
+---+-------+
| a | b |
+---+-------+
| 1 | hello |
| 2 | world |
| 3 | test |
+---+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_with_nulls() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "name": "Alice"}, {"id": 2, "name": null}, {"id": 3, "name": "Charlie"}]"#,
"SELECT id, name FROM test_table ORDER BY id",
)
.await?;

assert_snapshot!(result, @r"
+----+---------+
| id | name |
+----+---------+
| 1 | Alice |
| 2 | |
| 3 | Charlie |
+----+---------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_unnest() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "values": [10, 20, 30]}, {"id": 2, "values": [40, 50]}]"#,
"SELECT id, unnest(values) as value FROM test_table ORDER BY id, value",
)
.await?;

assert_snapshot!(result, @r"
+----+-------+
| id | value |
+----+-------+
| 1 | 10 |
| 1 | 20 |
| 1 | 30 |
| 2 | 40 |
| 2 | 50 |
+----+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_unnest_struct() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "orders": [{"product": "A", "qty": 2}, {"product": "B", "qty": 3}]}, {"id": 2, "orders": [{"product": "C", "qty": 1}]}]"#,
"SELECT id, unnest(orders)['product'] as product, unnest(orders)['qty'] as qty FROM test_table ORDER BY id, product",
)
.await?;

assert_snapshot!(result, @r"
+----+---------+-----+
| id | product | qty |
+----+---------+-----+
| 1 | A | 2 |
| 1 | B | 3 |
| 2 | C | 1 |
+----+---------+-----+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_nested_struct_access() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "dept": {"name": "Engineering", "head": "Alice"}}, {"id": 2, "dept": {"name": "Sales", "head": "Bob"}}]"#,
"SELECT id, dept['name'] as dept_name, dept['head'] as head FROM test_table ORDER BY id",
)
.await?;

assert_snapshot!(result, @r"
+----+-------------+-------+
| id | dept_name | head |
+----+-------------+-------+
| 1 | Engineering | Alice |
| 2 | Sales | Bob |
+----+-------------+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_with_compression() -> Result<()> {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write;

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy());

let file = std::fs::File::create(&path)?;
let mut encoder = GzEncoder::new(file, Compression::default());
encoder.write_all(
r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#.as_bytes(),
)?;
encoder.finish()?;

let ctx = SessionContext::new();
let options = NdJsonReadOptions::default()
.newline_delimited(false)
.file_compression_type(FileCompressionType::GZIP)
.file_extension(".json.gz");

ctx.register_json("test_table", &path, options).await?;
let result = ctx
.sql("SELECT a, b FROM test_table ORDER BY a")
.await?
.collect()
.await?;

assert_snapshot!(batches_to_string(&result), @r"
+---+-------+
| a | b |
+---+-------+
| 1 | hello |
| 2 | world |
+---+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_list_of_structs() -> Result<()> {
let batches = query_json_array(
r#"[{"id": 1, "items": [{"name": "x", "price": 10.5}]}, {"id": 2, "items": []}]"#,
"SELECT id, items FROM test_table ORDER BY id",
)
.await?;

assert_eq!(1, batches.len());
assert_eq!(2, batches[0].num_rows());
Ok(())
}
}
Loading