Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ import app.softnetwork.elastic.schema.{Index, IndexMappings}
import app.softnetwork.elastic.sql.policy.{EnrichPolicy, EnrichPolicyTask}
import app.softnetwork.elastic.sql.{query, schema, PainlessContextType}
import app.softnetwork.elastic.sql.query.{
Delete,
Insert,
SQLAggregation,
SearchStatement,
SelectStatement,
SingleSearch
SingleSearch,
Update
}
import app.softnetwork.elastic.sql.schema.{Schema, TableAlias}
import app.softnetwork.elastic.sql.transform.{
Expand Down Expand Up @@ -212,6 +215,9 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
override def deleteByQuery(index: String, query: String, refresh: Boolean): ElasticResult[Long] =
delegate.deleteByQuery(index, query, refresh)

override def deleteByQuery(index: String, delete: Delete, refresh: Boolean): ElasticResult[Long] =
delegate.deleteByQuery(index, delete, refresh)

override def isIndexClosed(index: String): ElasticResult[Boolean] =
delegate.isIndexClosed(index)

Expand All @@ -236,6 +242,14 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
): ElasticResult[Long] =
delegate.updateByQuery(index, query, pipelineId, refresh)

override def updateByQuery(
index: String,
update: Update,
pipelineId: Option[String],
refresh: Boolean
): ElasticResult[Long] =
delegate.updateByQuery(index, update, pipelineId, refresh)

/** Insert documents by query into an index.
*
* @param index
Expand All @@ -252,6 +266,11 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
): Future[ElasticResult[DmlResult]] =
delegate.insertByQuery(index, query, refresh)

override def insertByQuery(index: String, insert: Insert, refresh: Boolean)(implicit
system: ActorSystem
): Future[ElasticResult[DmlResult]] =
delegate.insertByQuery(index, insert, refresh)

/** Load the schema for the provided index.
*
* @param index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ class DmlExecutor(api: IndicesApi, logger: Logger) extends Executor[DmlStatement
implicit val ec: ExecutionContext = system.dispatcher
statement match {
case delete: Delete =>
api.deleteByQuery(delete.table.name, delete.sql) match {
// Pass the AST directly — avoids the AST → Delete.sql → re-parse round-trip.
api.deleteByQuery(delete.table.name, delete, refresh = true) match {
case ElasticSuccess(count) =>
logger.info(s"✅ Deleted $count documents from ${delete.table.name}.")
Future.successful(ElasticResult.success(DmlResult(deleted = count)))
Expand All @@ -207,7 +208,9 @@ class DmlExecutor(api: IndicesApi, logger: Logger) extends Executor[DmlStatement
)
}
case update: Update =>
api.updateByQuery(update.table, update.sql) match {
// Pass the AST directly — avoids the AST → Update.sql → re-parse round-trip that
// historically dropped the syntactic shape of string literals (issue #92).
api.updateByQuery(update.table, update, None, refresh = true) match {
case ElasticSuccess(count) =>
logger.info(s"✅ Updated $count documents in ${update.table}.")
Future.successful(ElasticResult.success(DmlResult(updated = count)))
Expand All @@ -219,7 +222,8 @@ class DmlExecutor(api: IndicesApi, logger: Logger) extends Executor[DmlStatement
)
}
case insert: Insert =>
api.insertByQuery(insert.table, insert.sql).map {
// Pass the AST directly — avoids the AST → Insert.sql → re-parse round-trip.
api.insertByQuery(insert.table, insert, refresh = true).map {
case success @ ElasticSuccess(res) =>
logger.info(s"✅ Inserted ${res.inserted} documents into ${insert.table}.")
success
Expand Down
Loading
Loading