Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ func (h *OpenAPIV2) MoveTable(c *gin.Context) {
return
}
}
err = maintainer.MoveTable(int64(tableId), node.ID(targetNodeID), mode, wait)
err = maintainer.MoveTable(tableId, node.ID(targetNodeID), mode, wait)
if err != nil {
log.Error("failed to move table", zap.Error(err), zap.Int64("tableID", tableId), zap.String("targetNodeID", targetNodeID))
_ = c.Error(err)
Expand Down Expand Up @@ -1250,7 +1250,7 @@ func (h *OpenAPIV2) MoveSplitTable(c *gin.Context) {

targetNodeID := c.Query("targetNodeID")
mode, _ := strconv.ParseInt(c.Query("mode"), 10, 64)
err = maintainer.MoveSplitTable(int64(tableId), node.ID(targetNodeID), mode)
err = maintainer.MoveSplitTable(tableId, node.ID(targetNodeID), mode)
if err != nil {
log.Error("failed to move split table", zap.Error(err), zap.Int64("tableID", tableId), zap.String("targetNodeID", targetNodeID))
_ = c.Error(err)
Expand Down Expand Up @@ -1325,7 +1325,7 @@ func (h *OpenAPIV2) SplitTableByRegionCount(c *gin.Context) {
return
}
mode, _ := strconv.ParseInt(c.Query("mode"), 10, 64)
err = maintainer.SplitTableByRegionCount(int64(tableId), mode)
err = maintainer.SplitTableByRegionCount(tableId, mode)
if err != nil {
log.Error("failed to split table by region count", zap.Error(err), zap.Int64("tableID", tableId))
_ = c.Error(err)
Expand Down Expand Up @@ -1395,7 +1395,7 @@ func (h *OpenAPIV2) MergeTable(c *gin.Context) {
}

mode, _ := strconv.ParseInt(c.Query("mode"), 10, 64)
err = maintainer.MergeTable(int64(tableId), mode)
err = maintainer.MergeTable(tableId, mode)
if err != nil {
log.Error("failed to merge table", zap.Error(err), zap.Int64("tableID", tableId))
_ = c.Error(err)
Expand Down
3 changes: 1 addition & 2 deletions cmd/kafka-consumer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,13 @@ func TestAppendRow2Group_DoesNotDropCommitTsFallbackBeforeApplied(t *testing.T)
group := progress.eventsGroup[1]
require.NotNil(t, group)

resolvedEvents := make([]*commonEvent.DMLEvent, 0)
// Expect: commitTs=100 is still kept and can be resolved.
resolved := group.ResolveInto(150, nil)
require.Len(t, resolved, 1)
require.Equal(t, uint64(100), resolved[0].CommitTs)

// Step 3: once downstream has flushed beyond commitTs=100, the replay is safe to ignore.
resolvedEvents = make([]*commonEvent.DMLEvent, 0)
resolvedEvents := make([]*commonEvent.DMLEvent, 0)
group.AppliedWatermark = 200
w.appendRow2Group(newDMLEvent(1, 100), progress, kafka.Offset(12))
resolved = group.ResolveInto(150, resolvedEvents)
Expand Down
2 changes: 1 addition & 1 deletion cmd/pulsar-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func newWriter(ctx context.Context, o *option) *writer {
o.replicaConfig.Sink.TiDBSourceID = 1
o.replicaConfig.Sink.Protocol = putil.AddressOf(o.protocol.String())

for i := 0; i < int(o.partitionNum); i++ {
for i := 0; i < o.partitionNum; i++ {
decoder, err := codec.NewEventDecoder(ctx, i, codecConfig, o.topic, db)
if err != nil {
log.Panic("cannot create the decoder", zap.Error(err))
Expand Down
7 changes: 3 additions & 4 deletions coordinator/changefeed/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/config"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/errors"
Comment thread
3AceShowHand marked this conversation as resolved.
"go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -182,13 +181,13 @@ func (m *Backoff) StartFinished() {

// ShouldFailChangefeed return true if a running error contains a changefeed not retry error.
func ShouldFailChangefeed(e *heartbeatpb.RunningError) bool {
return cerrors.ShouldFailChangefeed(errors.New(e.Message + e.Code))
return errors.ShouldFailChangefeed(errors.New(e.Message + e.Code))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Search for RunningError construction sites to verify Message/Code format.

# Find where RunningError structs are created
rg -nP --type=go -C3 'RunningError\s*\{' -g '!vendor/'

# Find where RunningError.Message or RunningError.Code are assigned
rg -nP --type=go -C3 '\.(Message|Code)\s*=' -g '!vendor/' -g '*heartbeatpb*'

Repository: pingcap/ticdc

Length of output: 20200


🏁 Script executed:

# Read the backoff.go file around line 184 to see the full function context
cat -n coordinator/changefeed/backoff.go | sed -n '170,200p'

Repository: pingcap/ticdc

Length of output: 1186


🏁 Script executed:

# Check the RunningError struct definition in protobuf
rg -n --type=go 'type RunningError struct' -A 10

Repository: pingcap/ticdc

Length of output: 2413


🏁 Script executed:

# Look for other usages of ShouldFailChangefeed to understand expected error message format
rg -n --type=go 'ShouldFailChangefeed' -B 3 -A 3

Repository: pingcap/ticdc

Length of output: 4109


🏁 Script executed:

# Check how e.Message is populated in the backoff code
rg -n --type=go 'e\.Message' coordinator/changefeed/backoff.go -B 2 -A 2

Repository: pingcap/ticdc

Length of output: 304


🏁 Script executed:

# Check how ShouldFailChangefeed in errors package actually works
cat -n pkg/errors/helper.go | sed -n '129,160p'

Repository: pingcap/ticdc

Length of output: 1053


🏁 Script executed:

# Check how ErrorCode function extracts the code from errors
rg -n --type=go 'func ErrorCode' -A 8

Repository: pingcap/ticdc

Length of output: 563


🏁 Script executed:

# Look for RFC or error code related helper functions
rg -n --type=go 'RFCErrorCode' -B 2 -A 5

Repository: pingcap/ticdc

Length of output: 3918


🏁 Script executed:

# Check what the ShouldFailChangefeed function at line 130 in helper.go actually checks
rg -n --type=go 'strings.Contains' pkg/errors/helper.go -B 2 -A 2

Repository: pingcap/ticdc

Length of output: 446


🏁 Script executed:

# Look for test cases that show how ShouldFailChangefeed is expected to work
cat -n pkg/errors/error_test.go | sed -n '23,100p'

Repository: pingcap/ticdc

Length of output: 3023


🏁 Script executed:

# Check the changefeedUnRetryableErrors list to understand what it contains
rg -n --type=go 'changefeedUnRetryableErrors' pkg/errors/helper.go -B 5 -A 5

Repository: pingcap/ticdc

Length of output: 1177


🏁 Script executed:

# Check if there are any other examples of how Message and Code are used together
rg -n --type=go 'Message.*Code' coordinator/changefeed/ -A 2 -B 2

Repository: pingcap/ticdc

Length of output: 474


🏁 Script executed:

# Look at how the error is supposed to be formatted by checking the comment or intent
rg -n --type=go 'RunningError' coordinator/changefeed/backoff.go -B 5 -A 5

Repository: pingcap/ticdc

Length of output: 1780


🏁 Script executed:

# Check if there's any documentation about how the Code field is structured
rg -n --type=go 'RFC.*Code\|error.*code' pkg/errors/helper.go | head -20

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Look at actual test cases to see what format is expected for e.Code and e.Message
cat -n coordinator/changefeed/backoff_test.go | sed -n '35,70p'

Repository: pingcap/ticdc

Length of output: 1308


🏁 Script executed:

# Check if there are any examples showing both Code and Message populated together
rg -n --type=go 'Code:.*Message:' coordinator/changefeed/ -B 1 -A 1

Repository: pingcap/ticdc

Length of output: 280


🏁 Script executed:

# Look for any formatter or helper that should be used for RunningError
rg -n --type=go 'func.*RunningError.*String\|func.*Format' -A 5

Repository: pingcap/ticdc

Length of output: 39


Fix error message formatting by adding separator and correcting order.

The concatenation e.Message + e.Code produces malformed error strings like "snapshot lost by goCDC:ErrSnapshotLostByGC" with no separator and reversed order. Use e.Code + ": " + e.Message to follow standard error formatting conventions (error code followed by message).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@coordinator/changefeed/backoff.go` at line 184, The error string is built in
the return statement in backoff.go using e.Message + e.Code which yields
reversed and unseparated values; update the construction inside the call to
errors.ShouldFailChangefeed to use the error code first and a separator before
the message (i.e., build the string as e.Code + ": " + e.Message) so the
resulting error reads like "ErrXYZ: description" instead of concatenated words.

}

func (m *Backoff) HandleError(errs []*heartbeatpb.RunningError) (bool, *heartbeatpb.RunningError) {
// if there are a fastFail error in errs, we can just fastFail the changefeed
for _, err := range errs {
if cerrors.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(err.Code)) ||
if errors.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(err.Code)) ||
ShouldFailChangefeed(err) {
return true, err
}
Expand Down
15 changes: 7 additions & 8 deletions coordinator/changefeed/etcd_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ import (
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/errors"
Comment thread
3AceShowHand marked this conversation as resolved.
"github.com/pingcap/ticdc/pkg/etcd"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand Down Expand Up @@ -154,7 +153,7 @@ func (b *EtcdBackend) CreateChangefeed(ctx context.Context,
return errors.Trace(err)
}
if !resp.Succeeded {
err = cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("create changefeed %s", info.ChangefeedID.Name()))
err = errors.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("create changefeed %s", info.ChangefeedID.Name()))
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -186,7 +185,7 @@ func (b *EtcdBackend) UpdateChangefeed(ctx context.Context, info *config.ChangeF
return errors.Trace(err)
}
if !putResp.Succeeded {
err = cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("update changefeed %s failed", info.ChangefeedID.Name()))
err = errors.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("update changefeed %s failed", info.ChangefeedID.Name()))
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -223,7 +222,7 @@ func (b *EtcdBackend) PauseChangefeed(ctx context.Context, id common.ChangeFeedI
return errors.Trace(err)
}
if !putResp.Succeeded {
err = cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("pause changefeed %s failed", id.DisplayName))
err = errors.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("pause changefeed %s failed", id.DisplayName))
return errors.Trace(err)
}
return nil
Expand All @@ -242,7 +241,7 @@ func (b *EtcdBackend) DeleteChangefeed(ctx context.Context,
return errors.Trace(err)
}
if !resp.Succeeded {
err = cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("delete changefeed %s", changefeedID.Name()))
err = errors.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("delete changefeed %s", changefeedID.Name()))
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -284,7 +283,7 @@ func (b *EtcdBackend) ResumeChangefeed(ctx context.Context,
return errors.Trace(err)
}
if !putResp.Succeeded {
err = cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("resume changefeed %s", info.ChangefeedID.Name()))
err = errors.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("resume changefeed %s", info.ChangefeedID.Name()))
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -336,7 +335,7 @@ func (b *EtcdBackend) SetChangefeedProgress(ctx context.Context, id common.Chang
}
}

err := cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("update changefeed to %s-%d", id.DisplayName, progress))
err := errors.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("update changefeed to %s-%d", id.DisplayName, progress))
return errors.Trace(err)
}

Expand Down
2 changes: 1 addition & 1 deletion coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func newMockEtcdClient(ownerID string) *mockEtcdClient {
}

func (m *mockEtcdClient) GetOwnerID(ctx context.Context) (config.CaptureID, error) {
return config.CaptureID(m.ownerID), nil
return m.ownerID, nil
}

func TestCoordinatorScheduling(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions coordinator/create_changefeed_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ func TestUpdateGCSafepointCallsGCManagerUpdate(t *testing.T) {

if kerneltype.IsClassic() {
gcManager.EXPECT().
TryUpdateServiceGCSafepoint(gomock.Any(), common.Ts(info.StartTs-1)).
TryUpdateServiceGCSafepoint(gomock.Any(), info.StartTs-1).
Return(nil).Times(1)
} else {
gcManager.EXPECT().
TryUpdateKeyspaceGCBarrier(gomock.Any(), gomock.Any(), gomock.Any(), common.Ts(info.StartTs-1)).
TryUpdateKeyspaceGCBarrier(gomock.Any(), gomock.Any(), gomock.Any(), info.StartTs-1).
Return(nil).Times(1)
}

Expand Down Expand Up @@ -263,7 +263,7 @@ func TestConcurrentDeleteLastChangefeedAndCreateNewOneKeepsExpectedGCSafepoint(t
TryDeleteServiceGCSafepoint(gomock.Any()).
Times(0)
gcManager.EXPECT().
TryUpdateServiceGCSafepoint(gomock.Any(), common.Ts(newInfo.StartTs-1)).
TryUpdateServiceGCSafepoint(gomock.Any(), newInfo.StartTs-1).
Return(nil).
Times(1)

Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ func (c *EventCollector) newCongestionControlMessages() map[node.ID]*event.Conge

// get available memory for this dispatcher
if pathMemory, exists := changefeedPathMemory[changefeedID][dispatcherID]; exists {
nodeDispatcherMemory[eventServiceID][changefeedID][dispatcherID] = uint64(pathMemory)
nodeDispatcherMemory[eventServiceID][changefeedID][dispatcherID] = pathMemory
}
return true
})
Expand Down
4 changes: 2 additions & 2 deletions downstreamadapter/eventcollector/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (h *EventsHandler) Handle(stat *dispatcherStat, events ...dispatcher.Dispat
stat.handleHandshakeEvent(events[0])
return false
default:
log.Panic("unknown event type", zap.Int("type", int(events[0].GetType())))
log.Panic("unknown event type", zap.Int("type", events[0].GetType()))
}
return false
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func (h *EventsHandler) GetType(event dispatcher.DispatcherEvent) dynstream.Even
case commonEvent.TypeDropEvent:
return dynstream.EventType{DataGroup: DataGroupDrop, Property: dynstream.NonBatchable, Droppable: false}
default:
log.Panic("unknown event type", zap.Int("type", int(event.GetType())))
log.Panic("unknown event type", zap.Int("type", event.GetType()))
}
return dynstream.DefaultEventType
}
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/sink/mysql/causality/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func genRowKeys(row commonEvent.RowChange, tableInfo *common.TableInfo, dispatch
// no concurrence for rows in the same dispatcher.
log.Debug("Use dispatcherID as the key", zap.Any("dispatcherID", dispatcherID))
tableKey := make([]byte, 8)
binary.BigEndian.PutUint64(tableKey, uint64(dispatcherID.GetLow()))
binary.BigEndian.PutUint64(tableKey, dispatcherID.GetLow())
keys = [][]byte{tableKey}
}
return keys
Expand Down
2 changes: 1 addition & 1 deletion logservice/eventstore/event_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ func TestWriteToEventStore(t *testing.T) {
CRTs: 211, // Note: must be different from smallEntry's CRTs to avoid key collision if key is same
KeyLen: uint32(len(largeEntryKey)),
ValueLen: uint32(len(largeEntryValue)) * uint32(store.compressionThreshold/10),
Key: []byte(largeEntryKey),
Key: largeEntryKey,
Value: bytes.Repeat(largeEntryValue, store.compressionThreshold/10),
OldValue: nil,
}
Expand Down
2 changes: 1 addition & 1 deletion logservice/logpuller/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"context"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/utils/heap"
)

Expand Down
2 changes: 1 addition & 1 deletion logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func newRegionRequestWorker(
}
var regionErr error
if err := version.CheckStoreVersion(ctx, worker.client.pd); err != nil {
if errors.Cause(err) == context.Canceled {
if cerror.Is(errors.Cause(err), context.Canceled) {
return nil
}
log.Error("event feed check store version fails",
Expand Down
2 changes: 1 addition & 1 deletion logservice/logpuller/region_request_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
"io"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/ticdc/logservice/logpuller/regionlock"
"github.com/pingcap/ticdc/pkg/errors"
Comment thread
3AceShowHand marked this conversation as resolved.
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
Expand Down
6 changes: 0 additions & 6 deletions logservice/schemastore/multi_version_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,6 @@ func buildRecoverTableEventForTest(schemaID, tableID int64, schemaName, tableNam
}

func buildCreatePartitionTableEventForTest(schemaID, tableID int64, schemaName, tableName string, partitionIDs []int64, finishedTs uint64) *PersistedDDLEvent {
partitionDefinitions := make([]model.PartitionDefinition, 0, len(partitionIDs))
for _, partitionID := range partitionIDs {
partitionDefinitions = append(partitionDefinitions, model.PartitionDefinition{
ID: partitionID,
})
}
return &PersistedDDLEvent{
Type: byte(model.ActionCreateTable),
SchemaID: schemaID,
Expand Down
4 changes: 2 additions & 2 deletions logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error {
for _, tableID := range tableIDs {
if store, ok := p.tableInfoStoreMap[tableID]; ok {
// do some safety check
switch model.ActionType(job.Type) {
switch job.Type {
case model.ActionCreateTable, model.ActionCreateTables:
// newly created tables should not be registered before this ddl are handled
log.Panic("should not be registered", zap.Int64("tableID", tableID))
Expand All @@ -776,7 +776,7 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error {
}

func shouldSkipDDL(job *model.Job, tableMap map[int64]*BasicTableInfo) bool {
switch model.ActionType(job.Type) {
switch job.Type {
// Skipping ActionCreateTable and ActionCreateTables when the table already exists:
// 1. It is possible to receive ActionCreateTable and ActionCreateTables multiple times,
// and filtering duplicates in a generic way is challenging.
Expand Down
Loading
Loading