Skip to content

Commit 71e1260

Browse files
esimkowitzcoderabbitai[bot]sawkaoneirocosm
authored
Add S3 fileshare implementation, improve cp behavior (#1896)
Adds the S3 `fileshare` implementation This also updates `wsh file cp` so it behaves more like `cp` for things like copying directories and directory entries. It's not meant to align with `cp` on everything, though. Our `wsh cp` will be recursive and will create intermediate directories by default. This also adds new aliases for `wsh view`: `wsh preview` and `wsh open` --------- Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: sawka <mike@commandline.dev> Co-authored-by: Sylvia Crowe <software@oneirocosm.com>
1 parent eff01f0 commit 71e1260

40 files changed

Lines changed: 2263 additions & 712 deletions

File tree

cmd/wsh/cmd/wshcmd-connserver.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/wshfs"
2222
"github.com/wavetermdev/waveterm/pkg/util/packetparser"
2323
"github.com/wavetermdev/waveterm/pkg/util/sigutil"
24+
"github.com/wavetermdev/waveterm/pkg/util/utilfn"
2425
"github.com/wavetermdev/waveterm/pkg/wavebase"
2526
"github.com/wavetermdev/waveterm/pkg/wshrpc"
2627
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
@@ -162,9 +163,7 @@ func serverRunRouter(jwtToken string) error {
162163
// just ignore and drain the rawCh (stdin)
163164
// when stdin is closed, shutdown
164165
defer wshutil.DoShutdown("", 0, true)
165-
for range rawCh {
166-
// ignore
167-
}
166+
utilfn.DrainChannelSafe(rawCh, "serverRunRouter:stdin")
168167
}()
169168
go func() {
170169
for msg := range termProxy.FromRemoteCh {

cmd/wsh/cmd/wshcmd-file-util.go

Lines changed: 11 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
package cmd
55

66
import (
7+
"context"
78
"encoding/base64"
89
"fmt"
910
"io"
1011
"io/fs"
1112
"strings"
1213

1314
"github.com/wavetermdev/waveterm/pkg/remote/connparse"
15+
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/fsutil"
1416
"github.com/wavetermdev/waveterm/pkg/util/fileutil"
1517
"github.com/wavetermdev/waveterm/pkg/util/wavefileutil"
1618
"github.com/wavetermdev/waveterm/pkg/wshrpc"
@@ -27,15 +29,15 @@ func convertNotFoundErr(err error) error {
2729
return err
2830
}
2931

30-
func ensureFile(origName string, fileData wshrpc.FileData) (*wshrpc.FileInfo, error) {
31-
info, err := wshclient.FileInfoCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: DefaultFileTimeout})
32+
func ensureFile(fileData wshrpc.FileData) (*wshrpc.FileInfo, error) {
33+
info, err := wshclient.FileInfoCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: fileTimeout})
3234
err = convertNotFoundErr(err)
3335
if err == fs.ErrNotExist {
34-
err = wshclient.FileCreateCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: DefaultFileTimeout})
36+
err = wshclient.FileCreateCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: fileTimeout})
3537
if err != nil {
3638
return nil, fmt.Errorf("creating file: %w", err)
3739
}
38-
info, err = wshclient.FileInfoCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: DefaultFileTimeout})
40+
info, err = wshclient.FileInfoCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: fileTimeout})
3941
if err != nil {
4042
return nil, fmt.Errorf("getting file info: %w", err)
4143
}
@@ -51,12 +53,12 @@ func streamWriteToFile(fileData wshrpc.FileData, reader io.Reader) error {
5153
// First truncate the file with an empty write
5254
emptyWrite := fileData
5355
emptyWrite.Data64 = ""
54-
err := wshclient.FileWriteCommand(RpcClient, emptyWrite, &wshrpc.RpcOpts{Timeout: DefaultFileTimeout})
56+
err := wshclient.FileWriteCommand(RpcClient, emptyWrite, &wshrpc.RpcOpts{Timeout: fileTimeout})
5557
if err != nil {
5658
return fmt.Errorf("initializing file with empty write: %w", err)
5759
}
5860

59-
const chunkSize = 32 * 1024 // 32KB chunks
61+
const chunkSize = wshrpc.FileChunkSize // 32KB chunks
6062
buf := make([]byte, chunkSize)
6163
totalWritten := int64(0)
6264

@@ -89,40 +91,9 @@ func streamWriteToFile(fileData wshrpc.FileData, reader io.Reader) error {
8991
return nil
9092
}
9193

92-
func streamReadFromFile(fileData wshrpc.FileData, size int64, writer io.Writer) error {
93-
const chunkSize = 32 * 1024 // 32KB chunks
94-
for offset := int64(0); offset < size; offset += chunkSize {
95-
// Calculate the length of this chunk
96-
length := chunkSize
97-
if offset+int64(length) > size {
98-
length = int(size - offset)
99-
}
100-
101-
// Set up the ReadAt request
102-
fileData.At = &wshrpc.FileDataAt{
103-
Offset: offset,
104-
Size: length,
105-
}
106-
107-
// Read the chunk
108-
data, err := wshclient.FileReadCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: int64(fileTimeout)})
109-
if err != nil {
110-
return fmt.Errorf("reading chunk at offset %d: %w", offset, err)
111-
}
112-
113-
// Decode and write the chunk
114-
chunk, err := base64.StdEncoding.DecodeString(data.Data64)
115-
if err != nil {
116-
return fmt.Errorf("decoding chunk at offset %d: %w", offset, err)
117-
}
118-
119-
_, err = writer.Write(chunk)
120-
if err != nil {
121-
return fmt.Errorf("writing chunk at offset %d: %w", offset, err)
122-
}
123-
}
124-
125-
return nil
94+
func streamReadFromFile(ctx context.Context, fileData wshrpc.FileData, writer io.Writer) error {
95+
ch := wshclient.FileReadStreamCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: fileTimeout})
96+
return fsutil.ReadFileStreamToWriter(ctx, ch, writer)
12697
}
12798

12899
type fileListResult struct {

cmd/wsh/cmd/wshcmd-file.go

Lines changed: 37 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"encoding/base64"
1010
"fmt"
1111
"io"
12-
"io/fs"
1312
"log"
1413
"os"
1514
"path"
@@ -31,8 +30,7 @@ const (
3130
WaveFileScheme = "wavefile"
3231
WaveFilePrefix = "wavefile://"
3332

34-
DefaultFileTimeout = 5000
35-
TimeoutYear = int64(365) * 24 * 60 * 60 * 1000
33+
TimeoutYear = int64(365) * 24 * 60 * 60 * 1000
3634

3735
UriHelpText = `
3836
@@ -83,12 +81,12 @@ Wave Terminal is capable of managing files from remote SSH hosts, S3-compatible
8381
systems, and the internal Wave filesystem. Files are addressed via URIs, which
8482
vary depending on the storage system.` + UriHelpText}
8583

86-
var fileTimeout int
84+
var fileTimeout int64
8785

8886
func init() {
8987
rootCmd.AddCommand(fileCmd)
9088

91-
fileCmd.PersistentFlags().IntVarP(&fileTimeout, "timeout", "t", 15000, "timeout in milliseconds for long operations")
89+
fileCmd.PersistentFlags().Int64VarP(&fileTimeout, "timeout", "t", 15000, "timeout in milliseconds for long operations")
9290

9391
fileListCmd.Flags().BoolP("recursive", "r", false, "list subdirectories recursively")
9492
fileListCmd.Flags().BoolP("long", "l", false, "use long listing format")
@@ -103,7 +101,6 @@ func init() {
103101
fileCmd.AddCommand(fileInfoCmd)
104102
fileCmd.AddCommand(fileAppendCmd)
105103
fileCpCmd.Flags().BoolP("merge", "m", false, "merge directories")
106-
fileCpCmd.Flags().BoolP("recursive", "r", false, "copy directories recursively")
107104
fileCpCmd.Flags().BoolP("force", "f", false, "force overwrite of existing files")
108105
fileCmd.AddCommand(fileCpCmd)
109106
fileMvCmd.Flags().BoolP("recursive", "r", false, "move directories recursively")
@@ -174,7 +171,7 @@ var fileAppendCmd = &cobra.Command{
174171
var fileCpCmd = &cobra.Command{
175172
Use: "cp [source-uri] [destination-uri]" + UriHelpText,
176173
Aliases: []string{"copy"},
177-
Short: "copy files between storage systems",
174+
Short: "copy files between storage systems, recursively if needed",
178175
Long: "Copy files between different storage systems." + UriHelpText,
179176
Example: " wsh file cp wavefile://block/config.txt ./local-config.txt\n wsh file cp ./local-config.txt wavefile://block/config.txt\n wsh file cp wsh://user@ec2/home/user/config.txt wavefile://client/config.txt",
180177
Args: cobra.ExactArgs(2),
@@ -202,17 +199,7 @@ func fileCatRun(cmd *cobra.Command, args []string) error {
202199
Info: &wshrpc.FileInfo{
203200
Path: path}}
204201

205-
// Get file info first to check existence and get size
206-
info, err := wshclient.FileInfoCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: 2000})
207-
err = convertNotFoundErr(err)
208-
if err == fs.ErrNotExist {
209-
return fmt.Errorf("%s: no such file", path)
210-
}
211-
if err != nil {
212-
return fmt.Errorf("getting file info: %w", err)
213-
}
214-
215-
err = streamReadFromFile(fileData, info.Size, os.Stdout)
202+
err = streamReadFromFile(cmd.Context(), fileData, os.Stdout)
216203
if err != nil {
217204
return fmt.Errorf("reading file: %w", err)
218205
}
@@ -229,7 +216,7 @@ func fileInfoRun(cmd *cobra.Command, args []string) error {
229216
Info: &wshrpc.FileInfo{
230217
Path: path}}
231218

232-
info, err := wshclient.FileInfoCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: DefaultFileTimeout})
219+
info, err := wshclient.FileInfoCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: fileTimeout})
233220
err = convertNotFoundErr(err)
234221
if err != nil {
235222
return fmt.Errorf("getting file info: %w", err)
@@ -265,20 +252,8 @@ func fileRmRun(cmd *cobra.Command, args []string) error {
265252
if err != nil {
266253
return err
267254
}
268-
fileData := wshrpc.FileData{
269-
Info: &wshrpc.FileInfo{
270-
Path: path}}
271-
272-
_, err = wshclient.FileInfoCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: DefaultFileTimeout})
273-
err = convertNotFoundErr(err)
274-
if err == fs.ErrNotExist {
275-
return fmt.Errorf("%s: no such file", path)
276-
}
277-
if err != nil {
278-
return fmt.Errorf("getting file info: %w", err)
279-
}
280255

281-
err = wshclient.FileDeleteCommand(RpcClient, wshrpc.CommandDeleteFileData{Path: path, Recursive: recursive}, &wshrpc.RpcOpts{Timeout: DefaultFileTimeout})
256+
err = wshclient.FileDeleteCommand(RpcClient, wshrpc.CommandDeleteFileData{Path: path, Recursive: recursive}, &wshrpc.RpcOpts{Timeout: fileTimeout})
282257
if err != nil {
283258
return fmt.Errorf("removing file: %w", err)
284259
}
@@ -295,14 +270,31 @@ func fileWriteRun(cmd *cobra.Command, args []string) error {
295270
Info: &wshrpc.FileInfo{
296271
Path: path}}
297272

298-
_, err = ensureFile(path, fileData)
273+
capability, err := wshclient.FileShareCapabilityCommand(RpcClient, fileData.Info.Path, &wshrpc.RpcOpts{Timeout: fileTimeout})
299274
if err != nil {
300-
return err
275+
return fmt.Errorf("getting fileshare capability: %w", err)
301276
}
302-
303-
err = streamWriteToFile(fileData, WrappedStdin)
304-
if err != nil {
305-
return fmt.Errorf("writing file: %w", err)
277+
if capability.CanAppend {
278+
err = streamWriteToFile(fileData, WrappedStdin)
279+
if err != nil {
280+
return fmt.Errorf("writing file: %w", err)
281+
}
282+
} else {
283+
buf := make([]byte, MaxFileSize)
284+
n, err := WrappedStdin.Read(buf)
285+
if err != nil && err != io.EOF {
286+
return fmt.Errorf("reading input: %w", err)
287+
}
288+
if int64(n) == MaxFileSize {
289+
if _, err := WrappedStdin.Read(make([]byte, 1)); err != io.EOF {
290+
return fmt.Errorf("input exceeds maximum file size of %d bytes", MaxFileSize)
291+
}
292+
}
293+
fileData.Data64 = base64.StdEncoding.EncodeToString(buf[:n])
294+
err = wshclient.FileWriteCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: fileTimeout})
295+
if err != nil {
296+
return fmt.Errorf("writing file: %w", err)
297+
}
306298
}
307299

308300
return nil
@@ -317,7 +309,7 @@ func fileAppendRun(cmd *cobra.Command, args []string) error {
317309
Info: &wshrpc.FileInfo{
318310
Path: path}}
319311

320-
info, err := ensureFile(path, fileData)
312+
info, err := ensureFile(fileData)
321313
if err != nil {
322314
return err
323315
}
@@ -346,7 +338,7 @@ func fileAppendRun(cmd *cobra.Command, args []string) error {
346338

347339
if buf.Len() >= 8192 { // 8KB batch size
348340
fileData.Data64 = base64.StdEncoding.EncodeToString(buf.Bytes())
349-
err = wshclient.FileAppendCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: int64(fileTimeout)})
341+
err = wshclient.FileAppendCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: fileTimeout})
350342
if err != nil {
351343
return fmt.Errorf("appending to file: %w", err)
352344
}
@@ -357,7 +349,7 @@ func fileAppendRun(cmd *cobra.Command, args []string) error {
357349

358350
if buf.Len() > 0 {
359351
fileData.Data64 = base64.StdEncoding.EncodeToString(buf.Bytes())
360-
err = wshclient.FileAppendCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: int64(fileTimeout)})
352+
err = wshclient.FileAppendCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: fileTimeout})
361353
if err != nil {
362354
return fmt.Errorf("appending to file: %w", err)
363355
}
@@ -398,10 +390,6 @@ func getTargetPath(src, dst string) (string, error) {
398390

399391
func fileCpRun(cmd *cobra.Command, args []string) error {
400392
src, dst := args[0], args[1]
401-
recursive, err := cmd.Flags().GetBool("recursive")
402-
if err != nil {
403-
return err
404-
}
405393
merge, err := cmd.Flags().GetBool("merge")
406394
if err != nil {
407395
return err
@@ -419,9 +407,9 @@ func fileCpRun(cmd *cobra.Command, args []string) error {
419407
if err != nil {
420408
return fmt.Errorf("unable to parse dest path: %w", err)
421409
}
422-
log.Printf("Copying %s to %s; recursive: %v, merge: %v, force: %v", srcPath, destPath, recursive, merge, force)
410+
log.Printf("Copying %s to %s; merge: %v, force: %v", srcPath, destPath, merge, force)
423411
rpcOpts := &wshrpc.RpcOpts{Timeout: TimeoutYear}
424-
err = wshclient.FileCopyCommand(RpcClient, wshrpc.CommandFileCopyData{SrcUri: srcPath, DestUri: destPath, Opts: &wshrpc.FileCopyOpts{Recursive: recursive, Merge: merge, Overwrite: force, Timeout: TimeoutYear}}, rpcOpts)
412+
err = wshclient.FileCopyCommand(RpcClient, wshrpc.CommandFileCopyData{SrcUri: srcPath, DestUri: destPath, Opts: &wshrpc.FileCopyOpts{Merge: merge, Overwrite: force, Timeout: TimeoutYear}}, rpcOpts)
425413
if err != nil {
426414
return fmt.Errorf("copying file: %w", err)
427415
}
@@ -449,7 +437,7 @@ func fileMvRun(cmd *cobra.Command, args []string) error {
449437
}
450438
log.Printf("Moving %s to %s; recursive: %v, force: %v", srcPath, destPath, recursive, force)
451439
rpcOpts := &wshrpc.RpcOpts{Timeout: TimeoutYear}
452-
err = wshclient.FileMoveCommand(RpcClient, wshrpc.CommandFileCopyData{SrcUri: srcPath, DestUri: destPath, Opts: &wshrpc.FileCopyOpts{Recursive: recursive, Overwrite: force, Timeout: TimeoutYear}}, rpcOpts)
440+
err = wshclient.FileMoveCommand(RpcClient, wshrpc.CommandFileCopyData{SrcUri: srcPath, DestUri: destPath, Opts: &wshrpc.FileCopyOpts{Overwrite: force, Timeout: TimeoutYear, Recursive: recursive}}, rpcOpts)
453441
if err != nil {
454442
return fmt.Errorf("moving file: %w", err)
455443
}
@@ -562,10 +550,7 @@ func fileListRun(cmd *cobra.Command, args []string) error {
562550

563551
filesChan := wshclient.FileListStreamCommand(RpcClient, wshrpc.FileListData{Path: path, Opts: &wshrpc.FileListOpts{All: recursive}}, &wshrpc.RpcOpts{Timeout: 2000})
564552
// Drain the channel when done
565-
defer func() {
566-
for range filesChan {
567-
}
568-
}()
553+
defer utilfn.DrainChannelSafe(filesChan, "fileListRun")
569554
if longForm {
570555
return filePrintLong(filesChan)
571556
}

cmd/wsh/cmd/wshcmd-view.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ var viewMagnified bool
1919

2020
var viewCmd = &cobra.Command{
2121
Use: "view {file|directory|URL}",
22+
Aliases: []string{"preview", "open"},
2223
Short: "preview/edit a file or directory",
2324
RunE: viewRun,
2425
PreRunE: preRunSetupRpcClient,

frontend/app/block/blockframe.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,8 @@ const BlockFrame_Default_Component = (props: BlockFrameProps) => {
604604
"--magnified-block-blur": `${magnifiedBlockBlur}px`,
605605
} as React.CSSProperties
606606
}
607-
{...({ inert: preview ? "1" : undefined } as any)} // sets insert="1" ... but tricks TS into accepting it
607+
// @ts-ignore: inert does exist in the DOM, just not in react
608+
inert={preview ? "1" : undefined} //
608609
>
609610
<BlockMask nodeModel={nodeModel} />
610611
{preview || viewModel == null ? null : (

frontend/app/modals/conntypeahead.tsx

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -377,13 +377,10 @@ const ChangeConnectionBlockModal = React.memo(
377377
// typeahead was opened. good candidate for verbose log level.
378378
//console.log("unable to load wsl list from backend. using blank list: ", e)
379379
});
380-
/////////
381-
// TODO-S3
382-
// this needs an rpc call to generate a list of s3 profiles
383-
const newS3List = [];
384-
setS3List(newS3List);
385-
/////////
386-
}, [changeConnModalOpen, setConnList]);
380+
RpcApi.ConnListAWSCommand(TabRpcClient, { timeout: 2000 })
381+
.then((s3List) => setS3List(s3List ?? []))
382+
.catch((e) => console.log("unable to load s3 list from backend:", e));
383+
}, [changeConnModalOpen]);
387384

388385
const changeConnection = React.useCallback(
389386
async (connName: string) => {
@@ -393,10 +390,13 @@ const ChangeConnectionBlockModal = React.memo(
393390
if (connName == blockData?.meta?.connection) {
394391
return;
395392
}
393+
const isAws = connName?.startsWith("aws:");
396394
const oldCwd = blockData?.meta?.file ?? "";
397395
let newCwd: string;
398396
if (oldCwd == "") {
399397
newCwd = "";
398+
} else if (isAws) {
399+
newCwd = "/";
400400
} else {
401401
newCwd = "~";
402402
}

frontend/app/store/global.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,17 @@ function getConnStatusAtom(conn: string): PrimitiveAtom<ConnStatus> {
672672
wshenabled: false,
673673
};
674674
rtn = atom(connStatus);
675+
} else if (conn.startsWith("aws:")) {
676+
const connStatus: ConnStatus = {
677+
connection: conn,
678+
connected: true,
679+
error: null,
680+
status: "connected",
681+
hasconnected: true,
682+
activeconnnum: 0,
683+
wshenabled: false,
684+
};
685+
rtn = atom(connStatus);
675686
} else {
676687
const connStatus: ConnStatus = {
677688
connection: conn,

0 commit comments

Comments
 (0)