Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 40 additions & 33 deletions crates/flow/src/targets/d1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,40 +300,46 @@ impl D1ExportContext {
key: &KeyValue,
values: &FieldValues,
) -> Result<(String, Vec<serde_json::Value>), RecocoError> {
let mut columns = vec![];
let mut placeholders = vec![];
let mut params = vec![];
let mut update_clauses = vec![];
use std::fmt::Write;

// ⚑ Bolt: Use String::with_capacity and Write to avoid intermediate Vec allocations and format!
let mut params = Vec::with_capacity(self.key_fields_schema.len() + self.value_fields_schema.len());
let mut sql = String::with_capacity(256);
let _ = write!(sql, "INSERT INTO {} (", self.table_name);

// Extract key parts - KeyValue is a wrapper around Box<[KeyPart]>
for (idx, _key_field) in self.key_fields_schema.iter().enumerate() {
let mut num_columns = 0;
for (idx, key_field) in self.key_fields_schema.iter().enumerate() {
if let Some(key_part) = key.0.get(idx) {
columns.push(self.key_fields_schema[idx].name.clone());
placeholders.push("?".to_string());
if num_columns > 0 { let _ = write!(sql, ", "); }
let _ = write!(sql, "{}", key_field.name);
params.push(key_part_to_json(key_part)?);
num_columns += 1;
}
}

// Add value fields
for (idx, value) in values.fields.iter().enumerate() {
if let Some(value_field) = self.value_fields_schema.get(idx) {
columns.push(value_field.name.clone());
placeholders.push("?".to_string());
if num_columns > 0 { let _ = write!(sql, ", "); }
let _ = write!(sql, "{}", value_field.name);
params.push(value_to_json(value)?);
update_clauses.push(format!(
"{} = excluded.{}",
value_field.name, value_field.name
));
num_columns += 1;
}
}

let sql = format!(
"INSERT INTO {} ({}) VALUES ({}) ON CONFLICT DO UPDATE SET {}",
self.table_name,
columns.join(", "),
placeholders.join(", "),
update_clauses.join(", ")
);
let _ = write!(sql, ") VALUES (");
for i in 0..num_columns {
if i > 0 { let _ = write!(sql, ", "); }
let _ = write!(sql, "?");
}

let _ = write!(sql, ") ON CONFLICT DO UPDATE SET ");
let mut num_updates = 0;
for (idx, _value) in values.fields.iter().enumerate() {
if let Some(value_field) = self.value_fields_schema.get(idx) {
if num_updates > 0 { let _ = write!(sql, ", "); }
let _ = write!(sql, "{} = excluded.{}", value_field.name, value_field.name);
num_updates += 1;
}
}

Ok((sql, params))
}
Expand All @@ -342,22 +348,23 @@ impl D1ExportContext {
&self,
key: &KeyValue,
) -> Result<(String, Vec<serde_json::Value>), RecocoError> {
let mut where_clauses = vec![];
let mut params = vec![];
use std::fmt::Write;

for (idx, _key_field) in self.key_fields_schema.iter().enumerate() {
// ⚑ Bolt: Use String::with_capacity and Write to avoid intermediate Vec allocations and format!
let mut params = Vec::with_capacity(self.key_fields_schema.len());
let mut sql = String::with_capacity(128);
let _ = write!(sql, "DELETE FROM {} WHERE ", self.table_name);

let mut first = true;
for (idx, key_field) in self.key_fields_schema.iter().enumerate() {
if let Some(key_part) = key.0.get(idx) {
where_clauses.push(format!("{} = ?", self.key_fields_schema[idx].name));
if !first { let _ = write!(sql, " AND "); }
let _ = write!(sql, "{} = ?", key_field.name);
params.push(key_part_to_json(key_part)?);
first = false;
}
}

let sql = format!(
"DELETE FROM {} WHERE {}",
self.table_name,
where_clauses.join(" AND ")
);

Ok((sql, params))
}

Expand Down