Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/elixir.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ jobs:
steps:
- name: Install MSSql Client Tools
run: |
curl https://packages.microsoft.com/keys/microsoft.asc | sudo tee /etc/apt/trusted.gpg.d/microsoft.asc
curl https://packages.microsoft.com/config/ubuntu/22.04/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list
sudo rm -f /etc/apt/sources.list.d/microsoft-prod.list
curl https://packages.microsoft.com/keys/microsoft.asc | sudo gpg --batch --yes --dearmor -o /usr/share/keyrings/microsoft-prod.gpg
echo "deb [signed-by=/usr/share/keyrings/microsoft-prod.gpg arch=amd64,arm64,armhf] https://packages.microsoft.com/ubuntu/22.04/prod jammy main" | sudo tee /etc/apt/sources.list.d/mssql-release.list
sudo apt-get update
sudo apt-get install mssql-tools18 unixodbc-dev
sudo ACCEPT_EULA=Y apt-get install -y mssql-tools18 unixodbc-dev

- uses: actions/checkout@v2

Expand Down
94 changes: 94 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,100 @@ iex> Tds.query!(pid, "INSERT INTO MyTable (MyColumn) VALUES (@my_value)",

* Automatic decoding and encoding of Elixir values to and from MSSQL's binary format
* Support of TDS Versions 7.3, 7.4
* Streaming/cursor support via `Tds.stream/4` and `DBConnection.stream/4`

## Streaming

Tds supports streaming large result sets using SQL Server server-side cursors. This allows you to process rows in batches without loading the entire result set into memory. Streaming requires an active transaction since SQL Server cursors must operate within a transaction context.

### Basic Usage

```elixir
Tds.transaction(pid, fn conn ->
stream = Tds.stream(conn, "SELECT id, name FROM users ORDER BY id", [])
results = Enum.to_list(stream)

# Process each batch
for %Tds.Result{rows: rows} <- results do
for row <- rows do
IO.inspect(row)
end
end
end)
```

### Chunked Streaming with `max_rows`

Control the number of rows fetched per batch with the `:max_rows` option (defaults to 500):

```elixir
Tds.transaction(pid, fn conn ->
stream = Tds.stream(conn, "SELECT id, name FROM users ORDER BY id", [], max_rows: 100)
results = Enum.to_list(stream)
end)
```

### Parameterized Queries

Streaming works with parameterized queries. When parameters are provided, Tds automatically uses `PARAMETERIZED_STMT` mode for the cursor:

```elixir
Tds.transaction(pid, fn conn ->
stream = Tds.stream(
conn,
"SELECT id, name FROM users WHERE id > @1 ORDER BY id",
[%Tds.Parameter{name: "@1", type: :integer, value: 100}]
)
results = Enum.to_list(stream)
end)
```

### Using `DBConnection.stream/4` Directly

You can also use the `DBConnection.stream/4` function with a prepared query:

```elixir
Tds.transaction(pid, fn conn ->
{:ok, query} = Tds.prepare(conn, "SELECT id FROM users ORDER BY id")
stream = DBConnection.stream(conn, query, [], max_rows: 100)
results = Enum.to_list(stream)
# ...
end)
```

### Lazy Enumeration with `Stream.map`

Use `Stream.map` to process rows lazily without loading all batches:

```elixir
Tds.transaction(pid, fn conn ->
stream = Tds.stream(conn, "SELECT id FROM users ORDER BY id", [], max_rows: 50)
stream
|> Stream.map(fn %Tds.Result{rows: rows} -> rows end)
|> Enum.take(3) # Only processes the first 3 batches
end)
```

### Ecto Integration

When using `Ecto.Adapters.Tds` (from `ecto_sql`), `Repo.stream/2` works out of the box since it delegates to `DBConnection.stream/4`:

```elixir
Repo.transaction(fn ->
Repo.stream(User, max_rows: 500)
|> Enum.to_list()
end)
```

### Technical Details

Streaming is implemented via the TDS cursor stored procedures:

- `sp_cursoropen` (ProcID 2) — Opens a FORWARD_ONLY, READ_ONLY cursor
- `sp_cursorfetch` (ProcID 7) — Fetches the next batch of rows (NEXT fetch type)
- `sp_cursorclose` (ProcID 9) — Closes and deallocates the cursor

For parameterized queries, the cursor uses `PARAMETERIZED_STMT` mode (scrollopt flag `0x1004`) which allows passing query parameters via `@params` (similar to `sp_executesql`).

## Configuration

Expand Down
6 changes: 6 additions & 0 deletions lib/tds.ex
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ defmodule Tds do
DBConnection.transaction(conn, fun, opts)
end

@spec stream(conn, iodata, list, [execute_option]) :: DBConnection.Stream.t()
def stream(conn, statement, params \\ [], opts \\ []) do
query = %Query{statement: statement}
DBConnection.stream(conn, query, params, opts)
end

@spec rollback(DBConnection.t(), reason :: any) :: no_return
defdelegate rollback(conn, any), to: DBConnection

Expand Down
25 changes: 22 additions & 3 deletions lib/tds/messages.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ defmodule Tds.Messages do

## Microsoft Stored Procedures
# @tds_sp_cursor 1
# @tds_sp_cursoropen 2
@tds_sp_cursoropen 2
# @tds_sp_cursorprepare 3
# @tds_sp_cursorexecute 4
# @tds_sp_cursorprepexec 5
# @tds_sp_cursorunprepare 6
# @tds_sp_cursorfetch 7
@tds_sp_cursorfetch 7
# @tds_sp_cursoroption 8
# @tds_sp_cursorclose 9
@tds_sp_cursorclose 9
@tds_sp_executesql 10
@tds_sp_prepare 11
@tds_sp_execute 12
Expand Down Expand Up @@ -186,6 +186,10 @@ defmodule Tds.Messages do
m = msg_result(m, params: [param | params])
{m, c, s}

{:returnvalue, param}, {msg_result(params: params) = m, c, s} ->
m = msg_result(m, params: [param | params])
{m, c, s}

{:returnstatus, status}, {msg_result() = m, c, s} ->
m = msg_result(m, status: status)
{m, c, s}
Expand Down Expand Up @@ -401,6 +405,21 @@ defmodule Tds.Messages do
encode_rpc_params(params, "")
end

defp encode_rpc(:sp_cursoropen, params) do
<<0xFF, 0xFF, @tds_sp_cursoropen::little-size(2)-unit(8), 0x00, 0x00>> <>
encode_rpc_params(params, "")
end

defp encode_rpc(:sp_cursorfetch, params) do
<<0xFF, 0xFF, @tds_sp_cursorfetch::little-size(2)-unit(8), 0x00, 0x00>> <>
encode_rpc_params(params, "")
end

defp encode_rpc(:sp_cursorclose, params) do
<<0xFF, 0xFF, @tds_sp_cursorclose::little-size(2)-unit(8), 0x00, 0x00>> <>
encode_rpc_params(params, "")
end

# Finished processing params
defp encode_rpc_params([], ret), do: ret

Expand Down
160 changes: 153 additions & 7 deletions lib/tds/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,41 @@ defmodule Tds.Protocol do
) ::
{:cont | :halt, Tds.Result.t(), new_state :: t()}
| {:error | :disconnect, Exception.t(), new_state :: t()}
def handle_fetch(_query, _cursor, _opts, state) do
{:error, Tds.Error.exception("Cursor is not supported by TDS"), state}
def handle_fetch(_query, cursor, opts, %{sock: _sock} = s) do
fetch_type = Keyword.get(opts, :fetch_type, 2)
max_rows = Keyword.get(opts, :max_rows, 500)

params = [
%Tds.Parameter{name: "@cursor", type: :integer, direction: :input, value: cursor},
%Tds.Parameter{name: "@fetchtype", type: :integer, direction: :input, value: fetch_type},
%Tds.Parameter{name: "@rownum", type: :integer, direction: :input, value: 0},
%Tds.Parameter{name: "@nrows", type: :integer, direction: :input, value: max_rows}
]

msg = msg_rpc(proc: :sp_cursorfetch, params: params)

Process.put(:resultset, false)

s = %{s | state: :executing}

case msg_send(msg, s) do
{:ok, %{result: %Tds.Result{num_rows: 0}} = s} ->
{:halt, %Tds.Result{columns: [], rows: [], num_rows: 0}, s}

{:ok, %{result: %Tds.Result{} = result} = s} ->
{:cont, result, s}

{:error, err, %{transaction: :started} = s} ->
{:error, err, %{s | transaction: :failed}}

{:error, err, s} ->
{:error, err, s}
end
rescue
exception ->
{:error, exception, s}
after
Process.delete(:resultset)
end

@spec handle_deallocate(
Expand All @@ -322,17 +355,123 @@ defmodule Tds.Protocol do
) ::
{:ok, Tds.Result.t(), new_state :: t()}
| {:error | :disconnect, Exception.t(), new_state :: t()}
def handle_deallocate(_query, _cursor, _opts, state) do
{:error, Tds.Error.exception("Cursor operations are not supported in TDS"), state}
def handle_deallocate(_query, cursor, _opts, %{sock: _sock} = s) do
params = [
%Tds.Parameter{name: "@cursor", type: :integer, direction: :input, value: cursor}
]

msg = msg_rpc(proc: :sp_cursorclose, params: params)

s = %{s | state: :executing}

case msg_send(msg, s) do
{:ok, %{result: result} = s} ->
{:ok, result, %{s | state: :ready}}

{:error, err, %{transaction: :started} = s} ->
{:error, err, %{s | transaction: :failed}}

{:error, err, s} ->
{:error, err, s}
end
end

@spec handle_declare(Query.t(), params :: any, opts :: Keyword.t(), state :: t) ::
{:ok, Query.t(), cursor :: any, new_state :: t}
| {:error | :disconnect, Exception.t(), new_state :: t}
def handle_declare(_query, _params, _opts, state) do
{:error, Tds.Error.exception("Cursor operations are not supported in TDS"), state}
def handle_declare(%Query{statement: statement} = query, params, opts, %{sock: _sock} = s) do
resultset? = Keyword.get(opts, :resultset, false)

prepared_params = Tds.Parameter.prepared_params(params)

cursor_param = %Tds.Parameter{
name: "@cursor",
type: :integer,
direction: :output,
value: nil
}

scrollopt =
if prepared_params != "" do
0x1004
else
0x0004
end

rpc_params =
[
cursor_param,
%Tds.Parameter{
name: "@stmt",
type: :string,
direction: :input,
value: statement
},
%Tds.Parameter{
name: "@scrollopt",
type: :integer,
direction: :input,
value: scrollopt
},
%Tds.Parameter{
name: "@ccopt",
type: :integer,
direction: :input,
value: 0x2001
},
%Tds.Parameter{
name: "@rowcount",
type: :integer,
direction: :output,
value: nil
}
] ++
if prepared_params != "" do
[
%Tds.Parameter{
name: "@params",
type: :string,
direction: :input,
value: prepared_params
}
] ++ Tds.Parameter.prepare_params(params)
else
[]
end

msg = msg_rpc(proc: :sp_cursoropen, params: rpc_params)

Process.put(:resultset, resultset?)

case msg_send(msg, %{s | state: :executing}) do
{:ok, %{result: %Tds.Result{} = result} = s} ->
Process.delete(:resultset)
cursor_handle = extract_cursor_handle(result)
{:ok, query, cursor_handle, %{s | state: :ready}}

{:error, err, %{transaction: :started} = s} ->
Process.delete(:resultset)
{:error, err, %{s | transaction: :failed}}

{:error, err, s} ->
Process.delete(:resultset)
{:error, err, s}
end
rescue
exception ->
Process.delete(:resultset)
{:error, exception, s}
end

defp extract_cursor_handle(%Tds.Result{params: params}) do
case Enum.find(params, &(&1.name == "@cursor" and &1.direction == :output)) do
%Tds.Parameter{value: cursor_id} when is_integer(cursor_id) -> cursor_id
_ -> nil
end
end

defp extract_cursor_handle(_), do: nil

# CONNECTION

defp instance(opts, s) do
Expand Down Expand Up @@ -762,7 +901,7 @@ defmodule Tds.Protocol do
|> send_query(state)
end

def message(:executing, msg_result(set: set), s) do
def message(:executing, msg_result(set: set, params: params), s) do
resultset? = Process.get(:resultset, false)

result =
Expand All @@ -772,6 +911,13 @@ defmodule Tds.Protocol do
{[h | _t], _false} -> h
end

result =
if result != nil and params != [] do
%{result | params: params}
else
result
end

{:ok, mark_ready(%{s | result: result})}
end

Expand Down
6 changes: 4 additions & 2 deletions lib/tds/result.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ defmodule Tds.Result do
* `rows`: The result set as a list of tuples. Each tuple corresponds to a
row, while each element in the tuple corresponds to a column.
* `num_rows`: The number of fetched or affected rows.
* `params`: Output parameters returned from stored procedures or cursors.
"""

@typedoc "The result of a database query."
@type t :: %__MODULE__{
columns: nil | [String.t()],
rows: nil | [[any()]],
num_rows: integer
num_rows: integer,
params: [Tds.Parameter.t()]
}

defstruct columns: nil, rows: nil, num_rows: 0
defstruct columns: nil, rows: nil, num_rows: 0, params: []

if Code.ensure_loaded?(Table.Reader) do
defimpl Table.Reader, for: Tds.Result do
Expand Down
Loading
Loading