Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 77 additions & 44 deletions lib/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ import { match } from 'ts-pattern'
import { getDatabase } from './db'
import { env } from './env'
import { generateNumberId } from './helpers'
import { logger } from './logger'

export class ObjectNotFoundError extends Error {
constructor(objectName: string) {
super(`Object not found in storage: ${objectName}`)
this.name = 'ObjectNotFoundError'
}
}

export class Storage {
adapter
Expand Down Expand Up @@ -208,22 +216,24 @@ export class Storage {
.where('id', '=', storageLocation.id)
.execute()

if (storageLocation.mergedAt || storageLocation.mergeStartedAt)
return this.downloadFromCacheEntryLocation(storageLocation)
try {
if (storageLocation.mergedAt || storageLocation.mergeStartedAt)
return await this.downloadFromCacheEntryLocation(storageLocation)

await this.db
.updateTable('storage_locations')
.set({
mergeStartedAt: Date.now(),
})
.where('id', '=', storageLocation.id)
.execute()
await this.ensurePartsExist(storageLocation)

const responseStream = new PassThrough()
const mergerStream = new PassThrough()
await this.db
.updateTable('storage_locations')
.set({
mergeStartedAt: Date.now(),
})
.where('id', '=', storageLocation.id)
.execute()

try {
const promise = this.adapter
const responseStream = new PassThrough()
const mergerStream = new PassThrough()

const mergePromise = this.adapter
.uploadStream(`${storageLocation.folderName}/merged`, mergerStream)
.then(async () => {
await this.db
Expand Down Expand Up @@ -255,31 +265,36 @@ export class Storage {
.execute()
mergerStream.destroy()
})
this.mergeStreamPromises.add(promise)
promise.finally(() => this.mergeStreamPromises.delete(promise))
this.mergeStreamPromises.add(mergePromise)
mergePromise.finally(() => this.mergeStreamPromises.delete(mergePromise))

this.pumpPartsToStreams(storageLocation, responseStream, mergerStream).catch((err) => {
responseStream.destroy(err)
mergerStream.destroy(err)
if (err instanceof ObjectNotFoundError)
logger.warn(`Stale cache entry ${cacheEntryId}: ${err.message}`)
})

return responseStream
} catch (err) {
await this.db
.updateTable('storage_locations')
.set({
mergedAt: null,
mergeStartedAt: null,
})
.where('id', '=', storageLocation.id)
.execute()
if (err instanceof ObjectNotFoundError) {
logger.warn(`Stale cache entry ${cacheEntryId}: ${err.message}`)
return
}
throw err
}
}

this.pumpPartsToStreams(storageLocation, responseStream, mergerStream).catch((err) => {
responseStream.destroy(err)
mergerStream.destroy(err)
})

return responseStream
private async ensurePartsExist(location: StorageLocation) {
const partsFolder = `${location.folderName}/parts`
const actualPartCount = await this.adapter.countFilesInFolder(partsFolder)
if (actualPartCount < location.partCount) throw new ObjectNotFoundError(partsFolder)
}

private async downloadFromCacheEntryLocation(location: StorageLocation) {
if (location.mergedAt) return this.adapter.createDownloadStream(`${location.folderName}/merged`)

await this.ensurePartsExist(location)
return Readable.from(this.streamParts(location))
}

Expand Down Expand Up @@ -522,15 +537,20 @@ class S3Adapter implements StorageAdapter {
}

async createDownloadStream(objectName: string) {
const response = await this.s3.send(
new GetObjectCommand({
Bucket: this.bucket,
Key: `${this.keyPrefix}/${objectName}`,
}),
)
if (!response.Body) throw new Error('No body in S3 get object response')
try {
const response = await this.s3.send(
new GetObjectCommand({
Bucket: this.bucket,
Key: `${this.keyPrefix}/${objectName}`,
}),
)
if (!response.Body) throw new Error('No body in S3 get object response')

return response.Body as Readable
return response.Body as Readable
} catch (err: any) {
if (err.name === 'NoSuchKey') throw new ObjectNotFoundError(objectName)
throw err
}
}

async deleteFolder(folderName: string) {
Expand Down Expand Up @@ -629,7 +649,13 @@ class FileSystemAdapter implements StorageAdapter {
}

async createDownloadStream(objectName: string) {
return createReadStream(path.join(this.rootFolder, objectName))
const filePath = path.join(this.rootFolder, objectName)
try {
await fs.access(filePath)
} catch {
throw new ObjectNotFoundError(objectName)
}
return createReadStream(filePath)
}

async deleteFolder(folderName: string) {
Expand All @@ -656,11 +682,15 @@ class FileSystemAdapter implements StorageAdapter {
}

async countFilesInFolder(folderName: string) {
const dir = await fs.readdir(path.join(this.rootFolder, folderName), {
withFileTypes: true,
})

return dir.filter((item) => item.isFile()).length
try {
const dir = await fs.readdir(path.join(this.rootFolder, folderName), {
withFileTypes: true,
})
return dir.filter((item) => item.isFile()).length
} catch (err: any) {
if (err.code === 'ENOENT') return 0
throw err
}
}
}

Expand Down Expand Up @@ -690,7 +720,10 @@ class GcsAdapter implements StorageAdapter {
}

async createDownloadStream(objectName: string) {
return this.bucket.file(`${this.keyPrefix}/${objectName}`).createReadStream()
const file = this.bucket.file(`${this.keyPrefix}/${objectName}`)
const [exists] = await file.exists()
if (!exists) throw new ObjectNotFoundError(objectName)
return file.createReadStream()
}

async deleteFolder(folderName: string) {
Expand Down
76 changes: 76 additions & 0 deletions tests/stale-cache.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import crypto from 'node:crypto'
import fs from 'node:fs/promises'
import path from 'node:path'

import { restoreCache, saveCache } from '@actions/cache'
import { SignJWT } from 'jose'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import { Storage } from '~/lib/storage'
import { TEST_TEMP_DIR } from './setup'

const testFilePath = path.join(TEST_TEMP_DIR, 'test-stale.bin')

describe('stale cache entry handling (missing storage objects)', () => {
let adapter: Awaited<ReturnType<typeof Storage.getAdapterFromEnv>>

beforeAll(async () => {
process.env.ACTIONS_CACHE_SERVICE_V2 = 'true'
process.env.ACTIONS_RUNTIME_TOKEN = await new SignJWT({
ac: JSON.stringify([{ Scope: 'refs/heads/main', Permission: 3 }]),
repository_id: '123',
})
.setProtectedHeader({ alg: 'HS256' })
.sign(crypto.createSecretKey('mock-secret-key', 'ascii'))

adapter = await Storage.getAdapterFromEnv()
})
afterAll(() => {
delete process.env.ACTIONS_CACHE_SERVICE_V2
delete process.env.ACTIONS_RUNTIME_TOKEN
})

test(
'returns cache miss when parts are wiped before first download (unmerged entry)',
{ timeout: 30_000 },
async () => {
const contents = crypto.randomBytes(1024)
await fs.writeFile(testFilePath, contents)
await saveCache([testFilePath], 'stale-fresh-key')
await fs.rm(testFilePath)

await adapter.clear()

const missKey = await restoreCache([testFilePath], 'stale-fresh-key')
expect(missKey).toBeUndefined()

const missKey2 = await restoreCache([testFilePath], 'stale-fresh-key')
expect(missKey2).toBeUndefined()
},
)

test(
'returns cache miss when the merged blob is wiped after merge completes',
{ timeout: 30_000 },
async () => {
const contents = crypto.randomBytes(1024)
await fs.writeFile(testFilePath, contents)
await saveCache([testFilePath], 'stale-merged-key')
await fs.rm(testFilePath)

const hitKey = await restoreCache([testFilePath], 'stale-merged-key')
expect(hitKey).toBe('stale-merged-key')
await fs.rm(testFilePath)

// Wait for the background merge to flush before wiping storage.
await new Promise((resolve) => setTimeout(resolve, 2000))

await adapter.clear()

const missKey = await restoreCache([testFilePath], 'stale-merged-key')
expect(missKey).toBeUndefined()

const missKey2 = await restoreCache([testFilePath], 'stale-merged-key')
expect(missKey2).toBeUndefined()
},
)
})
Loading