Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0b91b6b
functionality working
shubhangi-google Feb 13, 2025
63b3af0
adding unit test cases
shubhangi-google Feb 25, 2025
308ef36
fix typo
shubhangi-google Feb 25, 2025
8768271
returning upload_url will error
shubhangi-google Feb 27, 2025
cb561a4
fix typo
shubhangi-google Feb 27, 2025
1d26f0d
removing
shubhangi-google Feb 27, 2025
fd8b6fd
removing pry
shubhangi-google Feb 27, 2025
02be397
fix typo
shubhangi-google Feb 27, 2025
5daa50a
changing approach
shubhangi-google Mar 10, 2025
fa2b1ae
adding upload_id param
shubhangi-google Mar 10, 2025
e891f3d
updating
shubhangi-google Mar 10, 2025
0cc8b2b
code refactoring
shubhangi-google Mar 13, 2025
de23583
fixing comment
shubhangi-google Mar 13, 2025
872802b
wip - implementation 2
shubhangi-google Apr 21, 2025
b92dd7f
new changes working
shubhangi-google Apr 24, 2025
6fbf86a
rewriting unit test cases
shubhangi-google Apr 29, 2025
fa884a8
adding test cases
shubhangi-google Apr 30, 2025
8d956f2
fix typo
shubhangi-google Apr 30, 2025
a1df390
fix typo
shubhangi-google Apr 30, 2025
ebb5d6c
removing unwanted changes
shubhangi-google Apr 30, 2025
6fc5733
fixing PR comments
shubhangi-google May 7, 2025
422dc99
Update google-apis-core/lib/google/apis/core/storage_upload.rb
shubhangi-google May 13, 2025
238a1e2
:q
shubhangi-google May 13, 2025
89e281d
updating requests
shubhangi-google May 13, 2025
53b8d50
Update google-apis-core/lib/google/apis/core/storage_upload.rb
shubhangi-google May 19, 2025
0a9bb3f
Update base_service.rb
shubhangi-google May 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions google-apis-core/lib/google/apis/core/base_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,36 @@ def verify_universe_domain!
true
end

# Restarts An interrupted Resumable upload
# @param [String] bucket
# Name of the bucket where the upload is being performed.
# @param [IO, String] upload_source
# IO stream or filename containing content to upload
# @param [IO, String] upload_id
# unique id generated for an ongoing upload

def restart_resumable_upload(bucket, upload_source, upload_id, options: nil)
command = make_storage_upload_command(:put, 'b/{bucket}/o', options)
command.upload_source = upload_source
command.upload_id = upload_id
command.params['bucket'] = bucket unless bucket.nil?
execute_or_queue_command(command)
end

# Deletes An interrupted Resumable upload
# @param [String] bucket
# Name of the bucket where the upload is being performed.
# @param [IO, String] upload_id
# unique id generated for an ongoing upload

def delete_resumable_upload(bucket, upload_id, options: nil)
command = make_storage_upload_command(:delete, 'b/{bucket}/o', options)
command.upload_id = upload_id
command.params['bucket'] = bucket unless bucket.nil?
command.delete_upload = options[:delete_upload] unless options[:delete_upload].nil?
execute_or_queue_command(command)
end

protected

# Create a new upload command.
Expand Down
95 changes: 92 additions & 3 deletions google-apis-core/lib/google/apis/core/storage_upload.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ class StorageUploadCommand < ApiCommand
# @return [Integer]
attr_accessor :upload_chunk_size

# Unique upload_id of a resumable upload
# @return [String]
attr_accessor :upload_id

# Boolean Value to specify is a resumable upload is to be deleted or not
# @return [Boolean]
attr_accessor :delete_upload

# Ensure the content is readable and wrapped in an IO instance.
#
# @return [void]
Expand All @@ -61,7 +69,6 @@ def prepare!
# asserting that it already has a body. Form encoding is never used
# by upload requests.
self.body = '' unless self.body

super
if streamable?(upload_source)
self.upload_io = upload_source
Expand All @@ -73,14 +80,16 @@ def prepare!
self.upload_content_type = type&.content_type
end
@close_io_on_finish = true
elsif !upload_id.nil? && delete_upload
@close_io_on_finish = false
else
fail Google::Apis::ClientError, 'Invalid upload source'
end
end

# Close IO stream when command done. Only closes the stream if it was opened by the command.
def release!
upload_io.close if @close_io_on_finish
upload_io.close if @close_io_on_finish && !upload_io.nil?
end

# Execute the command, retrying as necessary
Expand All @@ -96,8 +105,16 @@ def execute(client)
prepare!
opencensus_begin_span
@upload_chunk_size = options.upload_chunk_size
if upload_id.nil?
res = do_retry :initiate_resumable_upload, client
elsif delete_upload && !upload_id.nil?
construct_resumable_upload_url upload_id
res = do_retry :cancel_resumable_upload, client
else
construct_resumable_upload_url upload_id
res = do_retry :reinitiate_resumable_upload, client
end

do_retry :initiate_resumable_upload, client
while @upload_incomplete
res = do_retry :send_upload_command, client
end
Expand Down Expand Up @@ -131,6 +148,22 @@ def initiate_resumable_upload(client)
error(e, rethrow: true)
end

# Reinitiating resumable upload
def reinitiate_resumable_upload(client)
logger.debug { sprintf('Restarting resumable upload command to %s', url) }
check_resumable_upload client
upload_io.pos = @offset
end

# Making resumable upload url from upload_id
def construct_resumable_upload_url(upload_id)
query_params = query.dup
query_params['uploadType'] = RESUMABLE
query_params['upload_id'] = upload_id
resumable_upload_params = query_params.map { |key, value| "#{key}=#{value}" }.join('&')
@upload_url = "#{url}&#{resumable_upload_params}"
end

# Send the actual content
#
# @param [HTTPClient] client
Expand Down Expand Up @@ -160,6 +193,9 @@ def send_upload_command(client)
@offset += current_chunk_size if @upload_incomplete
success(result)
rescue => e
logger.warn {
"error occured please use uploadId-#{response.headers['X-GUploader-UploadID']} to resume your upload"
} unless response.nil?
upload_io.pos = @offset
error(e, rethrow: true)
end
Expand All @@ -182,6 +218,59 @@ def process_response(status, header, body)
super(status, header, body)
end

def check_resumable_upload(client)
# Setting up request header
request_header = header.dup
request_header[CONTENT_RANGE_HEADER] = "bytes */#{upload_io.size}"
request_header[CONTENT_LENGTH_HEADER] = '0'
# Initiating call
response = client.put(@upload_url, header: request_header, follow_redirect: true)
handle_resumable_upload_http_response_codes(response)
end

# Cancel resumable upload
def cancel_resumable_upload(client)
# Setting up request header
request_header = header.dup
request_header[CONTENT_LENGTH_HEADER] = '0'
# Initiating call
response = client.delete(@upload_url, header: request_header, follow_redirect: true)
handle_resumable_upload_http_response_codes(response)

if !@upload_incomplete && (400..499).include?(response.code.to_i)
@close_io_on_finish = true
true # method returns true if upload is successfully cancelled
else
logger.debug { sprintf("Failed to cancel upload session. Response: #{response.code} - #{response.body}") }
end

end

def handle_resumable_upload_http_response_codes(response)
code = response.code.to_i

case code
when 308
if response.headers['Range']
range = response.headers['Range']
@offset = range.split('-').last.to_i + 1
logger.debug { sprintf("Upload is incomplete. Bytes uploaded so far: #{range}") }
else
logger.debug { sprintf('No bytes uploaded yet.') }
end
@upload_incomplete = true
when 400..499
# Upload is canceled
@upload_incomplete = false
when 200, 201
# Upload is complete.
@upload_incomplete = false
else
logger.debug { sprintf("Unexpected response: #{response.code} - #{response.body}") }
@upload_incomplete = true
end
end

def streamable?(upload_source)
upload_source.is_a?(IO) || upload_source.is_a?(StringIO) || upload_source.is_a?(Tempfile)
end
Expand Down
3 changes: 2 additions & 1 deletion google-apis-core/lib/google/apis/options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ module Apis
:quota_project,
:query,
:add_invocation_id_header,
:upload_chunk_size)
:upload_chunk_size
)

# General client options
class ClientOptions
Expand Down
89 changes: 89 additions & 0 deletions google-apis-core/spec/google/apis/core/service_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,95 @@
include_examples 'with options'
end

context 'when making restart resumable upload' do
let(:bucket_name) { 'test_bucket' }
let(:file) { StringIO.new('Hello world' * 3) }

let(:upload_id) { 'foo' }
let(:command) do
service.send(
:restart_resumable_upload,
bucket_name, file, upload_id,
options: { upload_chunk_size: 11}
)
end
let(:upload_url) { "https://www.googleapis.com/upload/b/#{bucket_name}/o?uploadType=resumable&upload_id=#{upload_id}"}
context 'should complete the upload' do
before(:example) do
stub_request(:put, upload_url)
.with(
headers: {
'Content-Length' => '0',
'Content-Range' => 'bytes */33'
}
)
.to_return(
status: [308, 'Resume Incomplete'],
headers: { 'Range' => 'bytes=0-21' }
)
end

before(:example) do
stub_request(:put, upload_url)
.with(headers: { 'Content-Range' => 'bytes 22-32/33' })
.to_return(body: %(OK))
end

it 'should send request to upload url multiple times' do
command
expect(a_request(:put, upload_url)).to have_been_made.twice
end
end
context 'not restart resumable upload if upload is completed' do
before(:example) do
stub_request(:put, upload_url)
.with(
headers: {
'Content-Length' => '0',
'Content-Range' => 'bytes */33'
}
)
.to_return(status: 200, headers: { 'Range' => 'bytes=0-32' })
end

before(:example) do
stub_request(:put, upload_url)
.with(headers: { 'Content-Range' => 'bytes */33' })
.to_return(status: 200)
end

it 'should not restart a upload' do
command
expect(a_request(:put, upload_url)).to have_been_made
end
end
end

context 'delete resumable upload with upload_id' do
let(:bucket_name) { 'test_bucket' }
let(:upload_id) { 'foo' }
let(:command) do
service.send(
:delete_resumable_upload,
bucket_name, upload_id,
options: { upload_chunk_size: 11, delete_upload: true }
)
end

let(:upload_url) { "https://www.googleapis.com/upload/b/#{bucket_name}/o?uploadType=resumable&upload_id=#{upload_id}" }
before(:example) do
stub_request(:delete, upload_url)
.with(headers: { 'Content-Length' => '0' })
.to_return(status: [499])
end

it 'should cancel a resumable upload' do
command
expect(a_request(:delete, upload_url)).to have_been_made
expect(command).to be_truthy
end
end

context 'with batch' do
before(:example) do
response = <<EOF.gsub(/\n/, "\r\n")
Expand Down
Loading