@@ -23,6 +23,7 @@ import (
2323 "github.com/stackql/stackql/internal/stackql/acid/binlog"
2424 "github.com/stackql/stackql/internal/stackql/drm"
2525 "github.com/stackql/stackql/internal/stackql/handler"
26+ "github.com/stackql/stackql/internal/stackql/internal_data_transfer/builder_input"
2627 "github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
2728 "github.com/stackql/stackql/internal/stackql/primitive"
2829 "github.com/stackql/stackql/internal/stackql/primitivegraph"
@@ -84,6 +85,7 @@ type monoValentExecution struct {
8485 isMutation bool
8586 isAwait bool
8687 defaultHTTPClient * http.Client // for testing purposes only
88+ bldrInput builder_input.BuilderInput
8789}
8890
8991func NewMonoValentExecutorFactory (
@@ -97,6 +99,7 @@ func NewMonoValentExecutorFactory(
9799 isSkipResponse bool ,
98100 isMutation bool ,
99101 isAwait bool ,
102+ bldrInput builder_input.BuilderInput ,
100103) MonoValentExecutorFactory {
101104 var tcc internaldto.TxnControlCounters
102105 if insertCtx != nil {
@@ -127,6 +130,7 @@ func NewMonoValentExecutorFactory(
127130 isAwait : isAwait ,
128131 defaultHTTPClient : defaultHTTPClient ,
129132 invoker : anysdkhttp .New (),
133+ bldrInput : bldrInput ,
130134 }
131135}
132136
@@ -1257,6 +1261,59 @@ func (sp *standardProcessor) Process() processorResponse {
12571261 return newHTTPProcessorResponse (nil , reversalStream , false , nil )
12581262}
12591263
1264+ var (
1265+ _ formulation.BaseArmouryGenerator = (* reconsititutedarmouryGenerator )(nil )
1266+ )
1267+
1268+ type rawTransformer struct {
1269+ node sqlparser.SQLNode
1270+ insertValOnlyRows map [int ]map [int ]interface {}
1271+ }
1272+
1273+ func (rt * rawTransformer ) Transform () (map [int ]map [string ]interface {}, error ) {
1274+ return util .ExtractSQLNodeParams (rt .node , rt .insertValOnlyRows )
1275+ }
1276+
1277+ func newRawTransformer (
1278+ node sqlparser.SQLNode ,
1279+ insertValOnlyRows map [int ]map [int ]interface {},
1280+ ) * rawTransformer {
1281+ return & rawTransformer {
1282+ node : node ,
1283+ insertValOnlyRows : insertValOnlyRows ,
1284+ }
1285+ }
1286+
1287+ type reconsititutedarmouryGenerator struct {
1288+ prior formulation.BaseArmouryGenerator
1289+ updatedParams map [int ]map [string ]any
1290+ }
1291+
1292+ func newUpdatedArmouryGenerator (
1293+ prior formulation.BaseArmouryGenerator ,
1294+ updatedParams map [int ]map [string ]any ,
1295+ ) formulation.BaseArmouryGenerator {
1296+ return & reconsititutedarmouryGenerator {
1297+ prior : prior ,
1298+ updatedParams : updatedParams ,
1299+ }
1300+ }
1301+
1302+ func (r * reconsititutedarmouryGenerator ) GetHTTPArmoury () (formulation.HTTPArmoury , error ) {
1303+ if len (r .updatedParams ) == 0 {
1304+ return r .prior .GetHTTPArmoury ()
1305+ }
1306+ armoury , err := r .prior .GetHTTPArmoury ()
1307+ if err != nil {
1308+ return nil , err
1309+ }
1310+ updatedArmoury , err := armoury .MergeLateBindingMaps (r .updatedParams )
1311+ if err != nil {
1312+ return nil , err
1313+ }
1314+ return updatedArmoury , nil
1315+ }
1316+
12601317//nolint:revive,nestif,funlen,gocognit // TODO: investigate
12611318func (mv * monoValentExecution ) GetExecutor () (func (pc primitive.IPrimitiveCtx ) internaldto.ExecutorOutput , error ) {
12621319 prov , err := mv .tableMeta .GetProvider ()
@@ -1280,6 +1337,62 @@ func (mv *monoValentExecution) GetExecutor() (func(pc primitive.IPrimitiveCtx) i
12801337 return nil , authCtxErr
12811338 }
12821339 ex := func (pc primitive.IPrimitiveCtx ) internaldto.ExecutorOutput {
1340+ requiredDepedencyKey , requiredKeyExists := mv .bldrInput .GetRequiredDataRequestKey ()
1341+ // lateBindingData := map[int]map[string]any{}
1342+
1343+ rawData := map [int ]map [int ]any {}
1344+ if requiredKeyExists {
1345+ dataRequest , isRequestExist := mv .graphHolder .GetExecutorOutput (requiredDepedencyKey )
1346+ if ! isRequestExist {
1347+ return internaldto .NewErroneousExecutorOutput (fmt .Errorf ("required data request not found" ))
1348+ }
1349+ requiredData := dataRequest .GetRawResult ()
1350+ if requiredData == nil {
1351+ return internaldto .NewErroneousExecutorOutput (fmt .Errorf ("nil required data" ))
1352+ }
1353+ rowCount := 0
1354+ for res , err := requiredData .Read (); res != nil ; {
1355+ rows , rowErr := res .GetMap ()
1356+ if rowErr != nil {
1357+ return internaldto .NewErroneousExecutorOutput (fmt .Errorf ("error reading required data: %w" , rowErr ))
1358+ }
1359+ for k , r := range rows {
1360+ rawData [k ] = r
1361+ rowCount ++
1362+ }
1363+ logging .GetLogger ().Infoln (fmt .Sprintf ("read %d rows for required data with key '%s'" , len (rows ), requiredDepedencyKey ))
1364+ if err != nil {
1365+ if err == io .EOF {
1366+ break
1367+ }
1368+ return internaldto .NewErroneousExecutorOutput (fmt .Errorf ("error reading required data: %w" , err ))
1369+ }
1370+ break // yes weird, the api should change
1371+ }
1372+ if rowCount == 0 {
1373+ return internaldto .NewNopEmptyExecutorOutput ([]string {"operation skipped due to nil data" })
1374+ }
1375+ logging .GetLogger ().Infoln (fmt .Sprintf ("required data for key '%s' = %v" , requiredDepedencyKey , rawData ))
1376+ }
1377+
1378+ lateBindingData := map [int ]map [string ]any {}
1379+ var lateBindingErr error
1380+ if len (rawData ) > 0 {
1381+ parserNode , _ := mv .bldrInput .GetParserNode ()
1382+ transformer := newRawTransformer (
1383+ parserNode ,
1384+ rawData ,
1385+ )
1386+ lateBindingData , lateBindingErr = transformer .Transform ()
1387+ if lateBindingErr != nil {
1388+ return internaldto .NewErroneousExecutorOutput (fmt .Errorf ("error transforming required data: %w" , lateBindingErr ))
1389+ }
1390+ logging .GetLogger ().Infoln (fmt .Sprintf ("late binding data = %v" , lateBindingData ))
1391+ }
1392+ armouryGenerator := newUpdatedArmouryGenerator (
1393+ mv .tableMeta ,
1394+ lateBindingData ,
1395+ )
12831396 currentTcc := mv .insertPreparedStatementCtx .GetGCCtrlCtrs ().Clone ()
12841397 mv .graphHolder .AddTxnControlCounters (currentTcc )
12851398 mr := prov .InferMaxResultsElement (m )
@@ -1302,7 +1415,7 @@ func (mv *monoValentExecution) GetExecutor() (func(pc primitive.IPrimitiveCtx) i
13021415 inlines [1 :],
13031416 nil ,
13041417 )
1305- armoury , armouryErr := mv . tableMeta .GetHTTPArmoury ()
1418+ armoury , armouryErr := armouryGenerator .GetHTTPArmoury ()
13061419 if armouryErr != nil {
13071420 return internaldto .NewErroneousExecutorOutput (armouryErr )
13081421 }
@@ -1402,7 +1515,7 @@ func (mv *monoValentExecution) GetExecutor() (func(pc primitive.IPrimitiveCtx) i
14021515 case client .HTTP :
14031516 invRes , invErr := mv .invoker .Invoke (context .Background (), providerinvoker.Request {
14041517 Payload : formulation .NewPayload (
1405- mv . tableMeta ,
1518+ armouryGenerator ,
14061519 provider ,
14071520 m ,
14081521 tableName ,
0 commit comments