Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions src/main/java/dev/netcopy/server/tcp/TcpConnectionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@
*
* <p>Conversation:
* <ol>
* <li>Read first frame, expect {@link Frame.Hello}. Validate {@code protoVer == 1} and the
* token via {@link TokenGate#matches(String)}. On failure: write {@link Frame.Err} with
* {@link FrameCodec#ERR_INCOMPATIBLE_VERSION} or {@link FrameCodec#ERR_UNAUTHORIZED} and
* close the connection.</li>
* <li>Reply with {@link Frame.HelloOk}.</li>
* <li>Read first frame, expect {@link Frame.Hello}. Reject {@code protoVer < 1} with
* {@link FrameCodec#ERR_INCOMPATIBLE_VERSION}; validate the token via
* {@link TokenGate#matches(String)}, reject mismatch with
* {@link FrameCodec#ERR_UNAUTHORIZED}. The negotiated version is
* {@code min(client.protoVer, SERVER_PROTO_VER)} and reported back in
* {@link Frame.HelloOk} so the client knows which trailer format to expect.</li>
* <li>Loop:
* <ul>
* <li>{@link Frame.Bye} or EOF — close gracefully.</li>
* <li>{@link Frame.Request} — resolve {@code manifestId} → {@link Manifest},
* {@code fileId} → entry, validate {@code offset}/{@code length}, hash the byte range,
* emit {@link Frame.DataHead} → N × {@link Frame.Data} → {@link Frame.DataEnd}.
* {@code fileId} → entry, validate {@code offset}/{@code length}, then stream the byte
* range using whichever variant the negotiated version selects (see below).
* Errors map to {@link FrameCodec#ERR_NOT_FOUND}/{@link FrameCodec#ERR_BAD_REQUEST}/
* {@link FrameCodec#ERR_INTERNAL}.</li>
* <li>Anything else — {@link Frame.Err} with {@link FrameCodec#ERR_BAD_REQUEST} and
Expand All @@ -44,14 +45,21 @@
* </li>
* </ol>
*
* <p>Hashing+streaming strategy: the byte range is read from disk in {@value #DATA_CHUNK_BYTES}
* chunks; each chunk is fed into a streaming {@link Xxh3Hasher} (one full pass through the
* bytes) to compute the canonical xxh3-128 of the range. Once the digest is known we write the
* {@link Frame.DataHead}, then we re-read the range and ship it as a sequence of
* {@link Frame.Data} frames followed by a single {@link Frame.DataEnd}. Two passes is the price
* of placing the hash <em>before</em> the data on the wire — the alternative (buffering the
* whole range in memory) would not scale to multi-GB files. The OS page cache absorbs the
* second read for any range that fits in RAM.
* <h2>Hashing + streaming variants</h2>
*
* <p><strong>v2 (default since v0.3.0)</strong> — single-pass. The byte range is read once in
* {@value #DATA_CHUNK_BYTES} chunks; every chunk is simultaneously written to the wire and
* fed through a streaming {@link Xxh3Hasher}. {@link Frame.DataHead} goes out first with an
* all-zero sentinel {@code xxh3} (the receiver ignores it on v2), DATA frames carry the body,
* and the real digest arrives in a trailing {@link Frame.DataEndV2}. One pass over the bytes
* — meaningful on cold-cache HDDs where the v1 second read paid full seek cost.
*
* <p><strong>v1 (legacy)</strong> — two passes. Read the range once into a streaming
* {@link Xxh3Hasher} to compute the digest; write {@link Frame.DataHead} carrying the digest;
* re-read the same range (typically from page cache) and ship it as a sequence of
* {@link Frame.Data} frames followed by {@link Frame.DataEnd}. Hash-before-body is wire-
* compatible with simpler clients but doubles disk read load on the source. Selected
* automatically when the client speaks v1 — old clients keep working.
*/
final class TcpConnectionHandler {

Expand Down
11 changes: 8 additions & 3 deletions src/main/java/dev/netcopy/transfer/Puller.java
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,14 @@ private Outcome processManifest(JobState job, Manifest manifest, BlobPuller pull
private void pullFile(JobState job, Manifest manifest, Manifest.Entry entry,
BlobPuller puller, Path targetBase) throws IOException, InterruptedException {

log.info("job {} pullFile: relPath={} size={} chunks={}",
job.id(), entry.relPath(), entry.size(),
entry.chunks() != null ? entry.chunks().size() : 0);
// Per-file trace at DEBUG so a 1k-file transfer doesn't drown INFO logs in
// 1k+ "pullFile relPath=..." lines. Job-level lifecycle events (start, pause,
// complete) keep INFO level so an operator can scan the log at a glance.
if (log.isDebugEnabled()) {
log.debug("job {} pullFile: relPath={} size={} chunks={}",
job.id(), entry.relPath(), entry.size(),
entry.chunks() != null ? entry.chunks().size() : 0);
}

Path targetAbs = targetBase.resolve(entry.relPath()).normalize();
Path parent = targetAbs.getParent();
Expand Down
10 changes: 9 additions & 1 deletion src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{20} - %msg%n</pattern>
<!--
ISO date in the timestamp so a `grep` across days isn't ambiguous
(HH:mm:ss alone repeats every 24h). Container log consumers like
Docker's json-file driver also stamp lines with the host's wall
clock, but those timestamps don't survive when the user pipes
logs through `kubectl logs --previous`, journalctl exports,
or the GitHub Actions UI's run logs.
-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%thread] %logger{20} - %msg%n</pattern>
</encoder>
</appender>

Expand Down
39 changes: 20 additions & 19 deletions tasks/contracts/data-formats.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Data formats — JSON-схемы
# Data formats — JSON schemas

Все REST-payload-ы и persisted JSON-файлы используют эти структуры. Изменения требуют согласования с тимлидом.
All REST payloads and persisted JSON files use these shapes. Changes require maintainer review.

> **Note for v0.4.0+:** persisted state files (`<state-dir>/jobs/*.json` и
> **Note for v0.4.0+:** persisted state files (`<state-dir>/jobs/*.json` and
> `<file>.netcopy/meta.json`) carry a `schemaVersion` field. Readers MUST refuse
> any file with `schemaVersion > CURRENT_SCHEMA_VERSION` to avoid
> mis-interpreting a future format. Pre-v0.4.0 files have no field — Jackson
Expand Down Expand Up @@ -53,7 +53,7 @@ Response (`Manifest`):
}
```

`type ∈ {"file", "dir", "symlink"}`. Только `file` имеет `size`/`mtime`/`chunks`. Только `symlink` имеет `target`.
`type ∈ {"file", "dir", "symlink"}`. Only `file` carries `size` / `mtime` / `chunks`. Only `symlink` carries `target`.

---

Expand Down Expand Up @@ -253,8 +253,8 @@ Written exactly once on sidecar creation with `CREATE_NEW + force(true)`.

## Sidecar — `<file>.netcopy/chunks.bitmap`

Binary. `chunkCount` бит, padded до байта. Bit i = 1 если chunk[i] завершён и
его XXH3-128 hash проверен.
Binary. `chunkCount` bits, padded up to a whole byte. Bit `i` is `1` once
chunk `i` has been written, fsynced, and its XXH3-128 hash verified.

Updated in place via positional `FileChannel.write(buf, 0)`. The bitmap is
idempotent — a torn write at most loses bits, causing those chunks to be
Expand Down Expand Up @@ -290,7 +290,7 @@ workers. After all chunks are verified and the full-file SHA-256 check passes,

---

## WebSocket — клиентсервер
## WebSocket — clientserver

```json
{ "type": "Subscribe", "transferId": "uuid" }
Expand All @@ -299,13 +299,13 @@ workers. After all chunks are verified and the full-file SHA-256 check passes,
{ "type": "UnsubscribeAll" }
```

`Subscribe` без `transferId` wildcard: клиент получает события всех transfer-ов
на этом сервере. Cap (v0.4.0+): 256 различных подписок на сессию.
`Subscribe` without `transferId` is a wildcard: the client receives events for
every transfer on this server. Cap (v0.4.0+): 256 distinct subscriptions per session.

## WebSocket — серверклиент (`ProgressEvent`)
## WebSocket — serverclient (`ProgressEvent`)

Дискриминатор — `type`. Все события включают `transferId` (или `null` для
"глобальных") и `timestamp`.
Discriminator: `type`. Every event carries `transferId` (or `null` for global
events) and `timestamp`.

```json
{ "type": "TransferRegistered", "transferId": "...", "timestamp": ...,
Expand Down Expand Up @@ -356,7 +356,7 @@ every 200 ms.

## TCP wire — Frame layout

Каждый frame: `[4 байта BE length:u32][1 байт type:u8][payload bytes...]` — `length` — длина payload (без header).
Each frame: `[4 bytes BE length:u32][1 byte type:u8][payload bytes...]` — `length` is the payload size (header bytes excluded).

| Type | Name | Payload layout |
|---|---|---|
Expand All @@ -370,18 +370,19 @@ every 200 ms.
| 0x08 | BYE | (no payload) |
| 0x09 | DATA_END_V2 | `reqId:u32` `xxh3:bytes(16)` |

Все строки — UTF-8. Все длины — Big-Endian. UUID — 16 байт raw (8 байт MSB BE + 8 байт LSB BE).
All strings are UTF-8. All length fields are big-endian. UUID is 16 raw bytes
(8 bytes MSB BE + 8 bytes LSB BE).

Max payload size:
- HELLO/HELLO_OK/REQUEST/DATA_HEAD/DATA_END/DATA_END_V2/ERR/BYE: 64 KB
- DATA: до 1 MB (рекомендуется 256 KB чанки внутри одного REQUEST)
- HELLO / HELLO_OK / REQUEST / DATA_HEAD / DATA_END / DATA_END_V2 / ERR / BYE: 64 KB
- DATA: up to 1 MB (256 KB chunks within one REQUEST is the typical sizing)

Если frame превышает лимит — closing connection с ERR_BAD_REQUEST.
A frame exceeding its limit closes the connection with ERR_BAD_REQUEST.

Codes для ERR:
ERR codes:
- 1001 ERR_INCOMPATIBLE_VERSION
- 1002 ERR_UNAUTHORIZED
- 1010 ERR_NOT_FOUND (manifest или file нет)
- 1010 ERR_NOT_FOUND (manifest or file absent)
- 1020 ERR_BAD_REQUEST (out-of-range offset, etc.)
- 1500 ERR_INTERNAL

Expand Down
Loading