Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# CHANGELOG

## v0.2.7 - Unreleased

* Fix `Collectable` implementation silently ignoring write errors.
* Fix `Collectable` implementation silently ignoring close errors.
* Fix `Enumerable` implementation crashing with `CaseClauseError` on read errors.
* Fix `Enumerable.count/1` returning invalid protocol value on error.
* Fix `Enumerable.slice/1` crashing with `MatchError` when object is invalid.
* Fix `Enumerable` slicing function ignoring seek/read errors.

## v0.2.6 - 2026-05-01

* Fix grammar and typos in the documentation
Expand Down
75 changes: 53 additions & 22 deletions lib/pg_large_objects/large_object.ex
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,22 @@ defimpl Collectable, for: PgLargeObjects.LargeObject do

collector = fn
lob, {:cont, data} ->
LargeObject.write(lob, data)
lob
case LargeObject.write(lob, data) do
:ok ->
lob

{:error, reason} ->
raise "failed to write to large object: #{inspect(reason)}"
end

lob, :done ->
LargeObject.close(lob)
lob
case LargeObject.close(lob) do
:ok ->
lob

{:error, reason} ->
raise "failed to close large object: #{inspect(reason)}"
end

_lob, :halt ->
:ok
Expand All @@ -393,44 +403,65 @@ defimpl Enumerable, for: PgLargeObjects.LargeObject do
case PgLargeObjects.LargeObject.read(lob, lob.bufsize) do
{:ok, ""} -> {:halt, lob}
{:ok, data} -> {[data], lob}
{:error, reason} -> raise "failed to read from large object: #{inspect(reason)}"
end
end

after_fun = fn lob ->
PgLargeObjects.LargeObject.close(lob)
try do
PgLargeObjects.LargeObject.close(lob)
rescue
_ -> :ok
end
end

Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
end

def count(lob) do
with {:ok, size} <- PgLargeObjects.LargeObject.size(lob) do
{:ok, ceil(size / lob.bufsize)}
case PgLargeObjects.LargeObject.size(lob) do
{:ok, size} -> {:ok, ceil(size / lob.bufsize)}
{:error, _} -> {:error, __MODULE__}
end
end

def member?(_lob, _element), do: {:error, __MODULE__}

def slice(lob) do
slicing_fun = fn
start, length, 1 ->
PgLargeObjects.LargeObject.seek(lob, start * lob.bufsize)
case count(lob) do
{:ok, size} ->
{:ok, size, fn start, length, step -> slicing_fun(lob, start, length, step) end}

for _ <- 0..(length - 1) do
{:ok, data} = PgLargeObjects.LargeObject.read(lob, lob.bufsize)
data
end
{:error, _} ->
{:error, __MODULE__}
end
end

start, length, step ->
for i <- 0..(length - 1)//step do
PgLargeObjects.LargeObject.seek(lob, (start + i) * lob.bufsize)
{:ok, data} = PgLargeObjects.LargeObject.read(lob, lob.bufsize)
data
end
defp slicing_fun(lob, start, length, 1) do
case PgLargeObjects.LargeObject.seek(lob, start * lob.bufsize) do
{:ok, _} -> :ok
{:error, reason} -> raise "failed to seek in large object: #{inspect(reason)}"
end

{:ok, size} = count(lob)
for _ <- 0..(length - 1) do
case PgLargeObjects.LargeObject.read(lob, lob.bufsize) do
{:ok, data} -> data
{:error, reason} -> raise "failed to read from large object: #{inspect(reason)}"
end
end
end

{:ok, size, slicing_fun}
defp slicing_fun(lob, start, length, step) do
for i <- 0..(length - 1)//step do
case PgLargeObjects.LargeObject.seek(lob, (start + i) * lob.bufsize) do
{:ok, _} -> :ok
{:error, reason} -> raise "failed to seek in large object: #{inspect(reason)}"
end

case PgLargeObjects.LargeObject.read(lob, lob.bufsize) do
{:ok, data} -> data
{:error, reason} -> raise "failed to read from large object: #{inspect(reason)}"
end
end
end
end
93 changes: 93 additions & 0 deletions test/pg_large_objects/large_object_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,99 @@ defmodule PgLargeObjects.LargeObjectTest do
end
end

describe "Collectable implementation" do
test "raises when writing to a read-only object" do
oid = put_large_object!("hello")

TestRepo.transaction(fn ->
{:ok, lob} = LargeObject.open(TestRepo, oid, mode: :read)

assert_raise RuntimeError, ~r/failed to write to large object/, fn ->
["new data"]
|> Stream.into(lob)
|> Stream.run()
end
end)
end

test "raises when closing an already-closed object" do
oid = put_large_object!("hello")

TestRepo.transaction(fn ->
{:ok, lob} = LargeObject.open(TestRepo, oid, mode: :write)

# Close the fd manually so the Collectable's close on :done will fail
LargeObject.close(lob)

assert_raise RuntimeError, ~r/failed to close large object/, fn ->
# Empty stream: no writes, only :done triggers close
[]
|> Stream.into(lob)
|> Stream.run()
end
end)
end
end

describe "Enumerable implementation" do
test "raises on read error" do
oid = put_large_object!("hello")

TestRepo.transaction(fn ->
{:ok, lob} = LargeObject.open(TestRepo, oid)

# Delete the object so reads will fail
LargeObject.remove(TestRepo, lob.oid)

assert_raise RuntimeError, fn ->
Enum.to_list(lob)
end
end)
end

test "count/1 returns {:error, module} when object is invalid" do
oid = put_large_object!("hello")

TestRepo.transaction(fn ->
{:ok, lob} = LargeObject.open(TestRepo, oid)
LargeObject.remove(TestRepo, lob.oid)

result = Enumerable.count(lob)
assert result == {:error, Enumerable.PgLargeObjects.LargeObject}
end)
end

test "slice/1 returns {:error, module} when object is invalid" do
oid = put_large_object!("hello")

TestRepo.transaction(fn ->
{:ok, lob} = LargeObject.open(TestRepo, oid)
LargeObject.remove(TestRepo, lob.oid)

result = Enumerable.slice(lob)
assert result == {:error, Enumerable.PgLargeObjects.LargeObject}
end)
end

test "slice/1 raises on seek error" do
oid = put_large_object!("hello")

TestRepo.transaction(fn ->
{:ok, lob} = LargeObject.open(TestRepo, oid, bufsize: 2)

# Get slice function
{:ok, _size, slicing_fun} = Enumerable.slice(lob)

# Delete object so seek fails
LargeObject.remove(TestRepo, lob.oid)

assert_raise RuntimeError, fn ->
slicing_fun.(0, 1, 1)
end
end)
end
end

defp with_object(data, opts \\ [], fun) do
oid = put_large_object!(data)

Expand Down