Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions gateway/gateway-controller/pkg/api/management/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

256 changes: 254 additions & 2 deletions gateway/gateway-controller/pkg/controlplane/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,13 +939,13 @@ func (c *Client) syncAPIKeysForExistingArtifacts(gatewayID string) {
continue
}
if cfg.Kind != models.KindLlmProvider && cfg.Kind != models.KindLlmProxy &&
cfg.Kind != models.KindRestApi && cfg.Kind != models.KindWebSubApi {
cfg.Kind != models.KindRestApi && cfg.Kind != models.KindWebSubApi && cfg.Kind != models.KindWebBrokerApi {
continue
}
artifactUUIDsByKind[cfg.Kind] = append(artifactUUIDsByKind[cfg.Kind], cfg.UUID)
}

for _, kind := range []string{models.KindRestApi, models.KindWebSubApi, models.KindLlmProvider, models.KindLlmProxy} {
for _, kind := range []string{models.KindRestApi, models.KindWebSubApi, models.KindWebBrokerApi, models.KindLlmProvider, models.KindLlmProxy} {
select {
case <-c.ctx.Done():
c.logger.Info("Stopping API key bulk sync due to client context cancellation")
Expand Down Expand Up @@ -1301,6 +1301,12 @@ func (c *Client) handleMessage(messageType int, message []byte) {
c.handleWebSubAPIUndeployedEvent(event)
case "websub.deleted":
c.handleWebSubAPIDeletedEvent(event)
case "webbroker.deployed":
c.handleWebBrokerAPIDeployedEvent(event)
case "webbroker.undeployed":
c.handleWebBrokerAPIUndeployedEvent(event)
case "webbroker.deleted":
c.handleWebBrokerAPIDeletedEvent(event)
case "application.updated":
c.handleApplicationUpdatedEvent(event)
default:
Expand Down Expand Up @@ -2709,6 +2715,252 @@ func (c *Client) handleWebSubAPIDeletedEvent(event map[string]any) {
c.performFullAPIDeletion(apiID, apiConfig, deletedEvent.CorrelationID)
}

func (c *Client) handleWebBrokerAPIDeployedEvent(event map[string]any) {
c.logger.Debug("WebBroker API Deployment Event",
slog.Any("payload", event["payload"]),
slog.Any("timestamp", event["timestamp"]),
slog.Any("correlationId", event["correlationId"]),
)

eventBytes, err := json.Marshal(event)
if err != nil {
c.logger.Error("Failed to marshal WebBroker API deployment event for parsing",
slog.Any("error", err),
)
return
}

var deployedEvent WebBrokerAPIDeployedEvent
if err := json.Unmarshal(eventBytes, &deployedEvent); err != nil {
c.logger.Error("Failed to parse WebBroker API deployment event",
slog.Any("error", err),
)
return
}

apiID := deployedEvent.Payload.APIID
if apiID == "" {
c.logger.Error("API ID is empty in WebBroker API deployment event")
return
}

c.logger.Info("Processing WebBroker API deployment",
slog.String("api_id", apiID),
slog.String("deployment_id", deployedEvent.Payload.DeploymentID),
slog.String("correlation_id", deployedEvent.CorrelationID),
)

// Fetch WebBroker API definition from control plane
zipData, err := c.apiUtilsService.FetchWebBrokerAPIDefinition(apiID)
if err != nil {
c.logger.Error("Failed to fetch WebBroker API definition",
slog.String("api_id", apiID),
slog.Any("error", err),
)
c.sendDeploymentAck(deployedEvent.Payload.DeploymentID, apiID, "webbroker", "deploy", "failed",
deployedEvent.Payload.PerformedAt, "GATEWAY_PROCESSING_ERROR")
return
}

yamlData, err := c.apiUtilsService.ExtractYAMLFromZip(zipData)
if err != nil {
c.logger.Error("Failed to extract YAML from WebBroker API ZIP",
slog.String("api_id", apiID),
slog.Any("error", err),
)
c.sendDeploymentAck(deployedEvent.Payload.DeploymentID, apiID, "webbroker", "deploy", "failed",
deployedEvent.Payload.PerformedAt, "GATEWAY_PROCESSING_ERROR")
return
}

performedAt := deployedEvent.Payload.PerformedAt.Truncate(time.Millisecond)
if performedAt.IsZero() {
performedAt = time.Now().Truncate(time.Millisecond)
}
result, err := c.apiUtilsService.CreateAPIFromYAML(yamlData, apiID, deployedEvent.Payload.DeploymentID, &performedAt, deployedEvent.CorrelationID, c.deploymentService)
if err != nil {
c.logger.Error("Failed to create WebBroker API from YAML",
slog.String("api_id", apiID),
slog.Any("error", err),
)
c.sendDeploymentAck(deployedEvent.Payload.DeploymentID, apiID, "webbroker", "deploy", "failed",
deployedEvent.Payload.PerformedAt, "GATEWAY_PROCESSING_ERROR")
return
}

if result.IsStale {
c.logger.Debug("Skipped stale WebBroker API deploy event (newer version exists in DB)",
slog.String("api_id", apiID),
slog.String("deployment_id", deployedEvent.Payload.DeploymentID),
)
return
}

c.sendDeploymentAck(deployedEvent.Payload.DeploymentID, apiID, "webbroker", "deploy", "success",
deployedEvent.Payload.PerformedAt, "")

c.logger.Info("Successfully processed WebBroker API deployment event",
slog.String("api_id", apiID),
slog.String("correlation_id", deployedEvent.CorrelationID),
)
}

func (c *Client) handleWebBrokerAPIUndeployedEvent(event map[string]any) {
c.logger.Debug("WebBroker API Undeployment Event",
slog.Any("payload", event["payload"]),
slog.Any("timestamp", event["timestamp"]),
slog.Any("correlationId", event["correlationId"]),
)

eventBytes, err := json.Marshal(event)
if err != nil {
c.logger.Error("Failed to marshal WebBroker API undeployment event for parsing",
slog.Any("error", err),
)
return
}

var undeployedEvent WebBrokerAPIUndeployedEvent
if err := json.Unmarshal(eventBytes, &undeployedEvent); err != nil {
c.logger.Error("Failed to parse WebBroker API undeployment event",
slog.Any("error", err),
)
return
}

apiID := undeployedEvent.Payload.APIID
if apiID == "" {
c.logger.Error("API ID is empty in WebBroker API undeployment event")
return
}

apiConfig, err := c.findAPIConfig(apiID)
if err != nil {
if storage.IsNotFoundError(err) {
c.logger.Warn("WebBroker API configuration not found for undeployment",
slog.String("api_id", apiID),
)
c.sendDeploymentAck(undeployedEvent.Payload.DeploymentID, apiID, "webbroker", "undeploy", "success",
undeployedEvent.Payload.PerformedAt, "")
return
}
c.logger.Error("Failed to fetch WebBroker API configuration for undeployment",
slog.String("api_id", apiID),
slog.String("correlation_id", undeployedEvent.CorrelationID),
slog.Any("error", err),
)
c.sendDeploymentAck(undeployedEvent.Payload.DeploymentID, apiID, "webbroker", "undeploy", "failed",
undeployedEvent.Payload.PerformedAt, "GATEWAY_PROCESSING_ERROR")
return
}

if apiConfig.DeploymentID != "" && undeployedEvent.Payload.DeploymentID != "" &&
apiConfig.DeploymentID != undeployedEvent.Payload.DeploymentID {
c.logger.Warn("Ignoring stale WebBroker API undeploy event: deployment ID mismatch",
slog.String("api_id", apiID),
slog.String("event_deployment_id", undeployedEvent.Payload.DeploymentID),
slog.String("current_deployment_id", apiConfig.DeploymentID),
)
c.sendDeploymentAck(undeployedEvent.Payload.DeploymentID, apiID, "webbroker", "undeploy", "failed",
undeployedEvent.Payload.PerformedAt, "DEPLOYMENT_ID_MISMATCH")
return
}

performedAt := undeployedEvent.Payload.PerformedAt.Truncate(time.Millisecond)
if performedAt.IsZero() {
performedAt = time.Now().Truncate(time.Millisecond)
}
apiConfig.DesiredState = models.StateUndeployed
apiConfig.DeploymentID = undeployedEvent.Payload.DeploymentID
apiConfig.DeployedAt = &performedAt
apiConfig.UpdatedAt = time.Now()

affected, err := c.db.UpsertConfig(apiConfig)
if err != nil {
c.logger.Error("Failed to upsert config for WebBroker API undeployment",
slog.String("api_id", apiID),
slog.Any("error", err),
)
c.sendDeploymentAck(undeployedEvent.Payload.DeploymentID, apiID, "webbroker", "undeploy", "failed",
undeployedEvent.Payload.PerformedAt, "GATEWAY_PROCESSING_ERROR")
return
}
if !affected {
c.logger.Debug("Skipped stale WebBroker API undeploy event (newer version exists in DB)",
slog.String("api_id", apiID),
slog.String("deployment_id", undeployedEvent.Payload.DeploymentID),
)
return
}

evt := eventhub.Event{
EventType: eventhub.EventTypeAPI,
Action: "UPDATE",
EntityID: apiID,
EventID: undeployedEvent.CorrelationID,
}
if err := c.eventHub.PublishEvent(c.gatewayID, evt); err != nil {
c.logger.Error("Failed to publish WebBroker API undeployment event", slog.Any("error", err))
}

c.sendDeploymentAck(undeployedEvent.Payload.DeploymentID, apiID, "webbroker", "undeploy", "success",
undeployedEvent.Payload.PerformedAt, "")

c.logger.Info("Successfully processed WebBroker API undeployment event",
slog.String("api_id", apiID),
slog.String("correlation_id", undeployedEvent.CorrelationID),
)
}

func (c *Client) handleWebBrokerAPIDeletedEvent(event map[string]any) {
c.logger.Debug("WebBroker API Deleted Event",
slog.Any("payload", event["payload"]),
slog.Any("timestamp", event["timestamp"]),
slog.Any("correlationId", event["correlationId"]),
)

eventBytes, err := json.Marshal(event)
if err != nil {
c.logger.Error("Failed to marshal WebBroker API deleted event for parsing",
slog.Any("error", err),
)
return
}

var deletedEvent WebBrokerAPIDeletedEvent
if err := json.Unmarshal(eventBytes, &deletedEvent); err != nil {
c.logger.Error("Failed to parse WebBroker API deleted event",
slog.Any("error", err),
)
return
}

apiID := deletedEvent.Payload.APIID
if apiID == "" {
c.logger.Error("API ID is empty in WebBroker API deleted event")
return
}

apiConfig, err := c.findAPIConfig(apiID)
if err != nil {
if storage.IsNotFoundError(err) {
c.logger.Warn("WebBroker API configuration not found for deletion; running orphan cleanup",
slog.String("api_id", apiID),
)
c.cleanupOrphanedResources(apiID, deletedEvent.CorrelationID)
return
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
c.logger.Error("Failed to fetch WebBroker API configuration for deletion",
slog.String("api_id", apiID),
slog.String("correlation_id", deletedEvent.CorrelationID),
slog.Any("error", err),
)
return
}

c.performFullAPIDeletion(apiID, apiConfig, deletedEvent.CorrelationID)
}

func (c *Client) handleMCPProxyDeploymentEvent(event map[string]any) {
c.logger.Debug("MCP Proxy Deployment Event",
slog.Any("payload", event["payload"]),
Expand Down
Loading
Loading