Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 2024-04-01 - Concurrency actor vs struct issue
**Learning:** `actor` isolates state to a single execution context. Using `withTaskGroup` inside an `actor` limits the concurrency execution as all child tasks inherit the actor's context and run sequentially instead of in parallel. We can optimize CPU/I/O intensive tasks by making `actor` components `struct` when they are stateless and only rely on thread-safe dependencies. Also, batching operations without releasing the task group limits parallelism. By using a sliding window for task queuing, we can maintain concurrency limits and ensure maximum resource utilization.
**Action:** Use `struct` for parallel scanning when state isolation is unnecessary. When using `withTaskGroup` for high-volume tasks, use a sliding window instead of static chunking to maximize concurrent execution.
44 changes: 26 additions & 18 deletions Sources/Cacheout/Memory/ProcessMemoryScanner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,30 +97,38 @@ actor ProcessMemoryScanner {
///
/// Returns the collected entries and the count of EPERM failures.
private func scanPIDs(_ pids: [pid_t]) async -> (entries: [ProcessEntryDTO], epermCount: Int) {
// Chunk PIDs to cap concurrency at maxConcurrency.
let chunks = stride(from: 0, to: pids.count, by: maxConcurrency).map {
Array(pids[$0..<min($0 + maxConcurrency, pids.count)])
}

var allEntries: [ProcessEntryDTO] = []
var totalEperm = 0

for chunk in chunks {
await withTaskGroup(of: ScanPIDResult.self) { group in
for pid in chunk {
await withTaskGroup(of: ScanPIDResult.self) { group in
var activeTasks = 0
var pidIterator = pids.makeIterator()

// Enqueue initial batch of tasks up to maxConcurrency
while activeTasks < maxConcurrency, let pid = pidIterator.next() {
group.addTask { [self] in
self.scanSinglePID(pid)
}
activeTasks += 1
}

// Process results and enqueue new tasks as old ones finish
for await result in group {
activeTasks -= 1
switch result {
case .success(let entry):
allEntries.append(entry)
case .eperm:
totalEperm += 1
case .otherError:
break
}

if let pid = pidIterator.next() {
group.addTask { [self] in
self.scanSinglePID(pid)
}
}
for await result in group {
switch result {
case .success(let entry):
allEntries.append(entry)
case .eperm:
totalEperm += 1
case .otherError:
break
}
activeTasks += 1
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/Cacheout/Scanner/CacheScanner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import Foundation

actor CacheScanner {
struct CacheScanner {
private let fileManager = FileManager.default

func scanAll(_ categories: [CacheCategory]) async -> [ScanResult] {
Expand Down
2 changes: 1 addition & 1 deletion Sources/Cacheout/Scanner/NodeModulesScanner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import Foundation

actor NodeModulesScanner {
struct NodeModulesScanner {
private let fileManager = FileManager.default

/// Common directories where developers keep projects
Expand Down