-
Notifications
You must be signed in to change notification settings - Fork 50
EC expiration #3718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EC expiration #3718
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,33 +1,45 @@ | ||
| package meta | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "fmt" | ||
| "slices" | ||
|
|
||
| "github.com/nspcc-dev/bbolt" | ||
| iec "github.com/nspcc-dev/neofs-node/internal/ec" | ||
| islices "github.com/nspcc-dev/neofs-node/internal/slices" | ||
| objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" | ||
| storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log" | ||
| cid "github.com/nspcc-dev/neofs-sdk-go/container/id" | ||
| "github.com/nspcc-dev/neofs-sdk-go/object" | ||
| oid "github.com/nspcc-dev/neofs-sdk-go/object/id" | ||
| "go.uber.org/zap" | ||
| ) | ||
|
|
||
| // RemovedObjects describes single item handled by [DB.Delete]. | ||
| type RemovedObject struct { | ||
| Address oid.Address | ||
| PayloadLen uint64 | ||
| } | ||
|
|
||
| // DeleteRes groups the resulting values of Delete operation. | ||
| type DeleteRes struct { | ||
| // Actually removed objects. First len(addrs) elements always contain addrs | ||
| // passed to [DB.Delete], but order is different in general. | ||
| RemovedObjects []RemovedObject | ||
| // RawRemoved contains the number of removed raw objects. | ||
| RawRemoved uint64 | ||
| // AvailableRemoved contains the number of removed available objects. | ||
| AvailableRemoved uint64 | ||
| // Sizes contains the sizes of removed objects. | ||
| // The order of the sizes is the same as in addresses' | ||
| // slice that was provided in the [DB.Delete] address list, | ||
| // meaning that i-th size equals the number of freed up bytes | ||
| // after removing an object by i-th address. A zero size is | ||
| // allowed, it claims a missing object. | ||
| Sizes []uint64 | ||
| } | ||
|
|
||
| // Delete removes object records from metabase indexes. | ||
| // Does not stop on an error if there are more objects to handle requested; | ||
| // returns the first error appeared with a number of deleted objects wrapped. | ||
| // | ||
| // Delete also looks up for objects that are hardly linked with elements of | ||
| // addrs list but not in the list themselves. If there are any, they are also | ||
| // deleted. | ||
| func (db *DB) Delete(addrs []oid.Address) (DeleteRes, error) { | ||
| db.modeMtx.RLock() | ||
| defer db.modeMtx.RUnlock() | ||
|
|
@@ -41,11 +53,11 @@ func (db *DB) Delete(addrs []oid.Address) (DeleteRes, error) { | |
| var rawRemoved uint64 | ||
| var availableRemoved uint64 | ||
| var err error | ||
| var sizes = make([]uint64, len(addrs)) | ||
| var removed []RemovedObject | ||
|
|
||
| err = db.boltDB.Update(func(tx *bbolt.Tx) error { | ||
| // We need to clear slice because tx can try to execute multiple times. | ||
| rawRemoved, availableRemoved, err = db.deleteGroup(tx, addrs, sizes) | ||
| rawRemoved, availableRemoved, removed, err = db.deleteGroup(tx, addrs) | ||
| return err | ||
| }) | ||
| if err == nil { | ||
|
|
@@ -58,7 +70,7 @@ func (db *DB) Delete(addrs []oid.Address) (DeleteRes, error) { | |
| return DeleteRes{ | ||
| RawRemoved: rawRemoved, | ||
| AvailableRemoved: availableRemoved, | ||
| Sizes: sizes, | ||
| RemovedObjects: removed, | ||
| }, err | ||
| } | ||
|
|
||
|
|
@@ -68,27 +80,32 @@ func (db *DB) Delete(addrs []oid.Address) (DeleteRes, error) { | |
| // objects that were stored. The second return value is a logical objects | ||
| // removed number: objects that were available (without Tombstones, GCMarks | ||
| // non-expired, etc.) | ||
| func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (uint64, uint64, error) { | ||
| func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, uint64, []RemovedObject, error) { | ||
| var rawDeleted uint64 | ||
| var availableDeleted uint64 | ||
| var errorCount int | ||
| var firstErr error | ||
|
|
||
| for i := range addrs { | ||
| removed, available, size, err := db.delete(tx, addrs[i]) | ||
| removedObjs, err := supplementRemovedObjects(tx, addrs) | ||
| if err != nil { | ||
| return 0, 0, nil, fmt.Errorf("extend removed objects: %w", err) | ||
| } | ||
|
|
||
| for i := range removedObjs { | ||
| removed, available, size, err := db.delete(tx, removedObjs[i].Address) | ||
| if err != nil { | ||
| errorCount++ | ||
| db.log.Warn("failed to delete object", zap.Stringer("addr", addrs[i]), zap.Error(err)) | ||
| db.log.Warn("failed to delete object", zap.Stringer("addr", removedObjs[i].Address), zap.Error(err)) | ||
| if firstErr == nil { | ||
| firstErr = fmt.Errorf("%s object delete fail: %w", addrs[i], err) | ||
| firstErr = fmt.Errorf("%s object delete fail: %w", removedObjs[i].Address, err) | ||
| } | ||
|
|
||
| continue | ||
| } | ||
|
|
||
| if removed { | ||
| rawDeleted++ | ||
| sizes[i] = size | ||
| removedObjs[i].PayloadLen = size | ||
| } | ||
|
|
||
| if available { | ||
|
|
@@ -97,26 +114,26 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (ui | |
| } | ||
|
|
||
| if firstErr != nil { | ||
| all := len(addrs) | ||
| all := len(removedObjs) | ||
| success := all - errorCount | ||
| return 0, 0, fmt.Errorf("deleted %d out of %d objects, first error: %w", success, all, firstErr) | ||
| return 0, 0, nil, fmt.Errorf("deleted %d out of %d objects, first error: %w", success, all, firstErr) | ||
| } | ||
|
|
||
| if rawDeleted > 0 { | ||
| err := db.updateCounter(tx, phy, rawDeleted, false) | ||
| if err != nil { | ||
| return 0, 0, fmt.Errorf("could not decrease phy object counter: %w", err) | ||
| return 0, 0, nil, fmt.Errorf("could not decrease phy object counter: %w", err) | ||
| } | ||
| } | ||
|
|
||
| if availableDeleted > 0 { | ||
| err := db.updateCounter(tx, logical, availableDeleted, false) | ||
| if err != nil { | ||
| return 0, 0, fmt.Errorf("could not decrease logical object counter: %w", err) | ||
| return 0, 0, nil, fmt.Errorf("could not decrease logical object counter: %w", err) | ||
| } | ||
| } | ||
|
|
||
| return rawDeleted, availableDeleted, nil | ||
| return rawDeleted, availableDeleted, removedObjs, nil | ||
| } | ||
|
|
||
| // delete removes object indexes from the metabase. | ||
|
|
@@ -180,3 +197,94 @@ func delUniqueIndexes(tx *bbolt.Tx, cnr cid.ID, oID oid.ID) error { | |
|
|
||
| return nil | ||
| } | ||
|
|
||
| // forms list of objects from addrs and their missing parts. | ||
carpawell marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // [RemovedObject.PayloadLen] is not initialized. | ||
| func supplementRemovedObjects(tx *bbolt.Tx, addrs []oid.Address) ([]RemovedObject, error) { | ||
| cnrMetaBktKey := make([]byte, 1+cid.Size) | ||
| cnrMetaBktKey[0] = metadataPrefix | ||
|
|
||
| res := make([]RemovedObject, len(addrs)) | ||
| for i := range addrs { | ||
| res[i].Address = addrs[i] | ||
| } | ||
|
|
||
| slices.SortFunc(res, func(a, b RemovedObject) int { | ||
| ac, bc := a.Address.Container(), b.Address.Container() | ||
| return bytes.Compare(ac[:], bc[:]) | ||
| }) | ||
|
|
||
| var err error | ||
| var cnrMetaBkt *bbolt.Bucket | ||
| var cnrMetaCrs *bbolt.Cursor | ||
| for i := range res { | ||
| cnr := res[i].Address.Container() | ||
|
|
||
| if i == 0 || cnr != res[i-1].Address.Container() { | ||
| copy(cnrMetaBktKey[1:], cnr[:]) | ||
|
|
||
| cnrMetaBkt = tx.Bucket(cnrMetaBktKey) | ||
| if cnrMetaBkt == nil { | ||
| continue | ||
| } | ||
| cnrMetaCrs = cnrMetaBkt.Cursor() | ||
| } else if cnrMetaBkt == nil { | ||
| continue | ||
| } | ||
|
|
||
| res, err = supplementRemovedECParts(res, cnrMetaBkt, cnrMetaCrs, addrs, res[i].Address) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("collect EC parts for %s: %w", res[i].Address, err) | ||
| } | ||
| } | ||
|
|
||
| return res, nil | ||
| } | ||
|
|
||
| // extends res with EC parts of addr which are not in addrs and returns updated res. | ||
| func supplementRemovedECParts(res []RemovedObject, cnrMetaBkt *bbolt.Bucket, cnrMetaCrs *bbolt.Cursor, addrs []oid.Address, addr oid.Address) ([]RemovedObject, error) { | ||
| cnr := addr.Container() | ||
| parent := addr.Object() | ||
| pref := slices.Concat([]byte{metaPrefixAttrIDPlain}, []byte(object.FilterParentID), objectcore.MetaAttributeDelimiter, | ||
| parent[:], objectcore.MetaAttributeDelimiter, | ||
| ) | ||
|
|
||
| var partCrs *bbolt.Cursor | ||
| var ecPref []byte | ||
| for k, _ := cnrMetaCrs.Seek(pref); ; k, _ = cnrMetaCrs.Next() { | ||
| partID, ok := bytes.CutPrefix(k, pref) | ||
| if !ok { | ||
| break | ||
| } | ||
| if len(partID) != oid.Size { | ||
| return nil, invalidMetaBucketKeyErr(k, fmt.Errorf("wrong OID len %d", len(partID))) | ||
| } | ||
| if islices.AllZeros(partID) { | ||
| return nil, invalidMetaBucketKeyErr(k, oid.ErrZero) | ||
| } | ||
|
|
||
| if partCrs == nil { | ||
| partCrs = cnrMetaBkt.Cursor() | ||
| } | ||
|
|
||
| if ecPref == nil { | ||
| ecPref = slices.Concat([]byte{metaPrefixIDAttr}, partID, []byte(iec.AttributePrefix)) // any of EC attributes | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you, please, recall me, if it is possible to PUT some object with this attribute, mentioning some different object as a parent? will it be accidentally dropped too with this code?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Each EC part has parent header inside, so consistency can be checked.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i meant if it is possible to misuse this code with |
||
| } else { | ||
| copy(ecPref[1:], partID) | ||
| } | ||
|
|
||
| if k, _ = partCrs.Seek(ecPref); !bytes.HasPrefix(k, ecPref) { | ||
| continue | ||
| } | ||
|
|
||
| id := oid.ID(partID) | ||
|
|
||
| if !slices.ContainsFunc(addrs, func(addr oid.Address) bool { return addr.Container() == cnr && addr.Object() == id }) { | ||
| res = append(res, RemovedObject{ | ||
| Address: oid.NewAddress(cnr, id), | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| return res, nil | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
too many vars with confusing names to me. add suffixes like "count", "objects", etc?