Skip to content

Commit 902ff9b

Browse files
esimkowitzsawkagithub-advanced-security[bot]coderabbitai[bot]
authored
enable wsh file cross-remote copy/move (#1725)
This adds the ability to stream `tar` archives over channels between `wsh` instances. The main use cases for this are remote copy and move operations. It also completes the `wavefs` implementation of the FileShare interface to allow copy/move interoperability between wavefiles and other storage types. The tar streaming functionality has been broken out into the new `tarcopy` package for easy reuse. New `fileshare` functions are added for `CopyInternal`, which allows copying files internal to a filesystem to bypass the expensive interop layer, and `MoveInternal`, which does the same for moving a file within a filesystem. Copying between remotes is now handled by `CopyRemote`, which accepts the source `FileShareClient` as a parameter. `wsh` connections use the same implementation for `CopyInternal` and `CopyRemote` as they need to request the channel on the remote destination, since we don't offer a way to pass channels as a parameter to a remote call. This also adds a recursive `-r` flag to `wsh file rm` to allow for deleting a directory and all its contents. S3 support will be addressed in a future PR. --------- Co-authored-by: sawka <mike@commandline.dev> Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent d224bd4 commit 902ff9b

23 files changed

Lines changed: 790 additions & 316 deletions

File tree

cmd/generatego/main-generatego.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ func GenerateWshClient() error {
3030
"github.com/wavetermdev/waveterm/pkg/waveobj",
3131
"github.com/wavetermdev/waveterm/pkg/wps",
3232
"github.com/wavetermdev/waveterm/pkg/vdom",
33+
"github.com/wavetermdev/waveterm/pkg/util/iochan/iochantypes",
3334
})
3435
wshDeclMap := wshrpc.GenerateWshCommandDeclMap()
3536
for _, key := range utilfn.GetOrderedMapKeys(wshDeclMap) {

cmd/wsh/cmd/wshcmd-file.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ func init() {
9898
fileCmd.AddCommand(fileListCmd)
9999
fileCmd.AddCommand(fileCatCmd)
100100
fileCmd.AddCommand(fileWriteCmd)
101+
fileRmCmd.Flags().BoolP("recursive", "r", false, "remove directories recursively")
101102
fileCmd.AddCommand(fileRmCmd)
102103
fileCmd.AddCommand(fileInfoCmd)
103104
fileCmd.AddCommand(fileAppendCmd)
@@ -259,6 +260,10 @@ func fileRmRun(cmd *cobra.Command, args []string) error {
259260
if err != nil {
260261
return err
261262
}
263+
recursive, err := cmd.Flags().GetBool("recursive")
264+
if err != nil {
265+
return err
266+
}
262267
fileData := wshrpc.FileData{
263268
Info: &wshrpc.FileInfo{
264269
Path: path}}
@@ -272,7 +277,7 @@ func fileRmRun(cmd *cobra.Command, args []string) error {
272277
return fmt.Errorf("getting file info: %w", err)
273278
}
274279

275-
err = wshclient.FileDeleteCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: DefaultFileTimeout})
280+
err = wshclient.FileDeleteCommand(RpcClient, wshrpc.CommandDeleteFileData{Path: path, Recursive: recursive}, &wshrpc.RpcOpts{Timeout: DefaultFileTimeout})
276281
if err != nil {
277282
return fmt.Errorf("removing file: %w", err)
278283
}

frontend/app/store/wshclientapi.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ class RpcApiType {
168168
}
169169

170170
// command "filedelete" [call]
171-
FileDeleteCommand(client: WshClient, data: FileData, opts?: RpcOpts): Promise<void> {
171+
FileDeleteCommand(client: WshClient, data: CommandDeleteFileData, opts?: RpcOpts): Promise<void> {
172172
return client.wshRpcCall("filedelete", data, opts);
173173
}
174174

@@ -203,7 +203,7 @@ class RpcApiType {
203203
}
204204

205205
// command "filestreamtar" [responsestream]
206-
FileStreamTarCommand(client: WshClient, data: CommandRemoteStreamTarData, opts?: RpcOpts): AsyncGenerator<string, void, boolean> {
206+
FileStreamTarCommand(client: WshClient, data: CommandRemoteStreamTarData, opts?: RpcOpts): AsyncGenerator<Packet, void, boolean> {
207207
return client.wshRpcStream("filestreamtar", data, opts);
208208
}
209209

@@ -258,7 +258,7 @@ class RpcApiType {
258258
}
259259

260260
// command "remotefiledelete" [call]
261-
RemoteFileDeleteCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
261+
RemoteFileDeleteCommand(client: WshClient, data: CommandDeleteFileData, opts?: RpcOpts): Promise<void> {
262262
return client.wshRpcCall("remotefiledelete", data, opts);
263263
}
264264

@@ -313,7 +313,7 @@ class RpcApiType {
313313
}
314314

315315
// command "remotetarstream" [responsestream]
316-
RemoteTarStreamCommand(client: WshClient, data: CommandRemoteStreamTarData, opts?: RpcOpts): AsyncGenerator<string, void, boolean> {
316+
RemoteTarStreamCommand(client: WshClient, data: CommandRemoteStreamTarData, opts?: RpcOpts): AsyncGenerator<Packet, void, boolean> {
317317
return client.wshRpcStream("remotetarstream", data, opts);
318318
}
319319

frontend/types/gotypes.d.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,12 @@ declare global {
166166
blockid: string;
167167
};
168168

169+
// wshrpc.CommandDeleteFileData
170+
type CommandDeleteFileData = {
171+
path: string;
172+
recursive: boolean;
173+
};
174+
169175
// wshrpc.CommandDisposeData
170176
type CommandDisposeData = {
171177
routeid: string;
@@ -583,6 +589,12 @@ declare global {
583589
// waveobj.ORef
584590
type ORef = string;
585591

592+
// iochantypes.Packet
593+
type Packet = {
594+
Data: string;
595+
Checksum: string;
596+
};
597+
586598
// wshrpc.PathCommandData
587599
type PathCommandData = {
588600
pathtype: string;

pkg/remote/awsconn/awsconn.go

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -124,28 +124,13 @@ func ParseProfiles() map[string]struct{} {
124124
}
125125

126126
func ListBuckets(ctx context.Context, client *s3.Client) ([]types.Bucket, error) {
127-
var err error
128-
var output *s3.ListBucketsOutput
129-
var buckets []types.Bucket
130-
region := client.Options().Region
131-
bucketPaginator := s3.NewListBucketsPaginator(client, &s3.ListBucketsInput{BucketRegion: &region})
132-
for bucketPaginator.HasMorePages() {
133-
output, err = bucketPaginator.NextPage(ctx)
134-
log.Printf("output: %v", output)
135-
if err != nil {
136-
var apiErr smithy.APIError
137-
if errors.As(err, &apiErr) && apiErr.ErrorCode() == "AccessDenied" {
138-
fmt.Println("You don't have permission to list buckets for this account.")
139-
err = apiErr
140-
} else {
141-
return nil, fmt.Errorf("Couldn't list buckets for your account. Here's why: %v\n", err)
142-
}
143-
break
144-
}
145-
if output == nil {
146-
break
127+
output, err := client.ListBuckets(ctx, &s3.ListBucketsInput{})
128+
if err != nil {
129+
var apiErr smithy.APIError
130+
if errors.As(err, &apiErr) {
131+
return nil, fmt.Errorf("error listing buckets: %v", apiErr)
147132
}
148-
buckets = append(buckets, output.Buckets...)
133+
return nil, fmt.Errorf("error listing buckets: %v", err)
149134
}
150-
return buckets, nil
135+
return output.Buckets, nil
151136
}

pkg/remote/connparse/connparse.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ func (c *Connection) GetFullURI() string {
5757
return c.Scheme + "://" + c.GetPathWithHost()
5858
}
5959

60+
func (c *Connection) GetSchemeAndHost() string {
61+
return c.Scheme + "://" + c.Host
62+
}
63+
6064
func ParseURIAndReplaceCurrentHost(ctx context.Context, uri string) (*Connection, error) {
6165
conn, err := ParseURI(uri)
6266
if err != nil {
@@ -148,7 +152,7 @@ func ParseURI(uri string) (*Connection, error) {
148152
}
149153
if strings.HasPrefix(remotePath, "/~") {
150154
remotePath = strings.TrimPrefix(remotePath, "/")
151-
} else if len(remotePath) > 1 && !windowsDriveRegex.MatchString(remotePath) && !strings.HasPrefix(remotePath, "/") && !strings.HasPrefix(remotePath, "~") {
155+
} else if len(remotePath) > 1 && !windowsDriveRegex.MatchString(remotePath) && !strings.HasPrefix(remotePath, "/") && !strings.HasPrefix(remotePath, "~") && !strings.HasPrefix(remotePath, "./") && !strings.HasPrefix(remotePath, "../") && !strings.HasPrefix(remotePath, ".\\") && !strings.HasPrefix(remotePath, "..\\") && remotePath != ".." {
152156
remotePath = "/" + remotePath
153157
}
154158
}

pkg/remote/connparse/connparse_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,54 @@ func TestParseURI_WSHCurrentPathShorthand(t *testing.T) {
190190
}
191191
}
192192

193+
func TestParseURI_WSHCurrentPath(t *testing.T) {
194+
cstr := "./Documents/path/to/file"
195+
c, err := connparse.ParseURI(cstr)
196+
if err != nil {
197+
t.Fatalf("failed to parse URI: %v", err)
198+
}
199+
expected := "./Documents/path/to/file"
200+
if c.Path != expected {
201+
t.Fatalf("expected path to be %q, got %q", expected, c.Path)
202+
}
203+
expected = "current"
204+
if c.Host != expected {
205+
t.Fatalf("expected host to be %q, got %q", expected, c.Host)
206+
}
207+
expected = "wsh"
208+
if c.Scheme != expected {
209+
t.Fatalf("expected scheme to be %q, got %q", expected, c.Scheme)
210+
}
211+
expected = "wsh://current/./Documents/path/to/file"
212+
if c.GetFullURI() != expected {
213+
t.Fatalf("expected full URI to be %q, got %q", expected, c.GetFullURI())
214+
}
215+
}
216+
217+
func TestParseURI_WSHCurrentPathWindows(t *testing.T) {
218+
cstr := ".\\Documents\\path\\to\\file"
219+
c, err := connparse.ParseURI(cstr)
220+
if err != nil {
221+
t.Fatalf("failed to parse URI: %v", err)
222+
}
223+
expected := ".\\Documents\\path\\to\\file"
224+
if c.Path != expected {
225+
t.Fatalf("expected path to be %q, got %q", expected, c.Path)
226+
}
227+
expected = "current"
228+
if c.Host != expected {
229+
t.Fatalf("expected host to be %q, got %q", expected, c.Host)
230+
}
231+
expected = "wsh"
232+
if c.Scheme != expected {
233+
t.Fatalf("expected scheme to be %q, got %q", expected, c.Scheme)
234+
}
235+
expected = "wsh://current/.\\Documents\\path\\to\\file"
236+
if c.GetFullURI() != expected {
237+
t.Fatalf("expected full URI to be %q, got %q", expected, c.GetFullURI())
238+
}
239+
}
240+
193241
func TestParseURI_WSHLocalShorthand(t *testing.T) {
194242
t.Parallel()
195243
cstr := "/~/path/to/file"

pkg/remote/fileshare/fileshare.go

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,11 @@ import (
55
"fmt"
66
"log"
77

8-
"github.com/wavetermdev/waveterm/pkg/remote/awsconn"
98
"github.com/wavetermdev/waveterm/pkg/remote/connparse"
109
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/fstype"
11-
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/s3fs"
1210
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/wavefs"
1311
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/wshfs"
12+
"github.com/wavetermdev/waveterm/pkg/util/iochan/iochantypes"
1413
"github.com/wavetermdev/waveterm/pkg/wshrpc"
1514
"github.com/wavetermdev/waveterm/pkg/wshutil"
1615
)
@@ -29,12 +28,12 @@ func CreateFileShareClient(ctx context.Context, connection string) (fstype.FileS
2928
}
3029
conntype := conn.GetType()
3130
if conntype == connparse.ConnectionTypeS3 {
32-
config, err := awsconn.GetConfig(ctx, connection)
33-
if err != nil {
34-
log.Printf("error getting aws config: %v", err)
35-
return nil, nil
36-
}
37-
return s3fs.NewS3Client(config), conn
31+
// config, err := awsconn.GetConfig(ctx, connection)
32+
// if err != nil {
33+
// log.Printf("error getting aws config: %v", err)
34+
// return nil, nil
35+
// }
36+
return nil, nil
3837
} else if conntype == connparse.ConnectionTypeWave {
3938
return wavefs.NewWaveClient(), conn
4039
} else if conntype == connparse.ConnectionTypeWsh {
@@ -61,10 +60,10 @@ func ReadStream(ctx context.Context, data wshrpc.FileData) <-chan wshrpc.RespOrE
6160
return client.ReadStream(ctx, conn, data)
6261
}
6362

64-
func ReadTarStream(ctx context.Context, data wshrpc.CommandRemoteStreamTarData) <-chan wshrpc.RespOrErrorUnion[[]byte] {
63+
func ReadTarStream(ctx context.Context, data wshrpc.CommandRemoteStreamTarData) <-chan wshrpc.RespOrErrorUnion[iochantypes.Packet] {
6564
client, conn := CreateFileShareClient(ctx, data.Path)
6665
if conn == nil || client == nil {
67-
return wshutil.SendErrCh[[]byte](fmt.Errorf(ErrorParsingConnection, data.Path))
66+
return wshutil.SendErrCh[iochantypes.Packet](fmt.Errorf(ErrorParsingConnection, data.Path))
6867
}
6968
return client.ReadTarStream(ctx, conn, data.Opts)
7069
}
@@ -110,35 +109,47 @@ func Mkdir(ctx context.Context, path string) error {
110109
}
111110

112111
func Move(ctx context.Context, data wshrpc.CommandFileCopyData) error {
113-
srcConn, err := connparse.ParseURIAndReplaceCurrentHost(ctx, data.SrcUri)
114-
if err != nil {
115-
return fmt.Errorf("error parsing source connection %s: %v", data.SrcUri, err)
112+
srcClient, srcConn := CreateFileShareClient(ctx, data.SrcUri)
113+
if srcConn == nil || srcClient == nil {
114+
return fmt.Errorf("error creating fileshare client, could not parse source connection %s", data.SrcUri)
116115
}
117116
destClient, destConn := CreateFileShareClient(ctx, data.DestUri)
118117
if destConn == nil || destClient == nil {
119-
return fmt.Errorf("error creating fileshare client, could not parse connection %s or %s", data.SrcUri, data.DestUri)
118+
return fmt.Errorf("error creating fileshare client, could not parse destination connection %s", data.DestUri)
119+
}
120+
if srcConn.Host != destConn.Host {
121+
err := destClient.CopyRemote(ctx, srcConn, destConn, srcClient, data.Opts)
122+
if err != nil {
123+
return fmt.Errorf("cannot copy %q to %q: %w", data.SrcUri, data.DestUri, err)
124+
}
125+
return srcClient.Delete(ctx, srcConn, data.Opts.Recursive)
126+
} else {
127+
return srcClient.MoveInternal(ctx, srcConn, destConn, data.Opts)
120128
}
121-
return destClient.Move(ctx, srcConn, destConn, data.Opts)
122129
}
123130

124131
func Copy(ctx context.Context, data wshrpc.CommandFileCopyData) error {
125-
srcConn, err := connparse.ParseURIAndReplaceCurrentHost(ctx, data.SrcUri)
126-
if err != nil {
127-
return fmt.Errorf("error parsing source connection %s: %v", data.SrcUri, err)
132+
srcClient, srcConn := CreateFileShareClient(ctx, data.SrcUri)
133+
if srcConn == nil || srcClient == nil {
134+
return fmt.Errorf("error creating fileshare client, could not parse source connection %s", data.SrcUri)
128135
}
129136
destClient, destConn := CreateFileShareClient(ctx, data.DestUri)
130137
if destConn == nil || destClient == nil {
131-
return fmt.Errorf("error creating fileshare client, could not parse connection %s or %s", data.SrcUri, data.DestUri)
138+
return fmt.Errorf("error creating fileshare client, could not parse destination connection %s", data.DestUri)
139+
}
140+
if srcConn.Host != destConn.Host {
141+
return destClient.CopyRemote(ctx, srcConn, destConn, srcClient, data.Opts)
142+
} else {
143+
return srcClient.CopyInternal(ctx, srcConn, destConn, data.Opts)
132144
}
133-
return destClient.Copy(ctx, srcConn, destConn, data.Opts)
134145
}
135146

136-
func Delete(ctx context.Context, path string) error {
137-
client, conn := CreateFileShareClient(ctx, path)
147+
func Delete(ctx context.Context, data wshrpc.CommandDeleteFileData) error {
148+
client, conn := CreateFileShareClient(ctx, data.Path)
138149
if conn == nil || client == nil {
139-
return fmt.Errorf(ErrorParsingConnection, path)
150+
return fmt.Errorf(ErrorParsingConnection, data.Path)
140151
}
141-
return client.Delete(ctx, conn)
152+
return client.Delete(ctx, conn, data.Recursive)
142153
}
143154

144155
func Join(ctx context.Context, path string, parts ...string) (string, error) {

pkg/remote/fileshare/fstype/fstype.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88

99
"github.com/wavetermdev/waveterm/pkg/remote/connparse"
10+
"github.com/wavetermdev/waveterm/pkg/util/iochan/iochantypes"
1011
"github.com/wavetermdev/waveterm/pkg/wshrpc"
1112
)
1213

@@ -18,7 +19,7 @@ type FileShareClient interface {
1819
// ReadStream returns a stream of file data at the given path. If it's a directory, then the list of entries
1920
ReadStream(ctx context.Context, conn *connparse.Connection, data wshrpc.FileData) <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData]
2021
// ReadTarStream returns a stream of tar data at the given path
21-
ReadTarStream(ctx context.Context, conn *connparse.Connection, opts *wshrpc.FileCopyOpts) <-chan wshrpc.RespOrErrorUnion[[]byte]
22+
ReadTarStream(ctx context.Context, conn *connparse.Connection, opts *wshrpc.FileCopyOpts) <-chan wshrpc.RespOrErrorUnion[iochantypes.Packet]
2223
// ListEntries returns the list of entries at the given path, or nothing if the path is a file
2324
ListEntries(ctx context.Context, conn *connparse.Connection, opts *wshrpc.FileListOpts) ([]*wshrpc.FileInfo, error)
2425
// ListEntriesStream returns a stream of entries at the given path
@@ -29,12 +30,14 @@ type FileShareClient interface {
2930
AppendFile(ctx context.Context, conn *connparse.Connection, data wshrpc.FileData) error
3031
// Mkdir creates a directory at the given path
3132
Mkdir(ctx context.Context, conn *connparse.Connection) error
32-
// Move moves the file from srcConn to destConn
33-
Move(ctx context.Context, srcConn, destConn *connparse.Connection, opts *wshrpc.FileCopyOpts) error
34-
// Copy copies the file from srcConn to destConn
35-
Copy(ctx context.Context, srcConn, destConn *connparse.Connection, opts *wshrpc.FileCopyOpts) error
33+
// Move moves the file within the same connection
34+
MoveInternal(ctx context.Context, srcConn, destConn *connparse.Connection, opts *wshrpc.FileCopyOpts) error
35+
// Copy copies the file within the same connection
36+
CopyInternal(ctx context.Context, srcConn, destConn *connparse.Connection, opts *wshrpc.FileCopyOpts) error
37+
// CopyRemote copies the file between different connections
38+
CopyRemote(ctx context.Context, srcConn, destConn *connparse.Connection, srcClient FileShareClient, opts *wshrpc.FileCopyOpts) error
3639
// Delete deletes the entry at the given path
37-
Delete(ctx context.Context, conn *connparse.Connection) error
40+
Delete(ctx context.Context, conn *connparse.Connection, recursive bool) error
3841
// Join joins the given parts to the connection path
3942
Join(ctx context.Context, conn *connparse.Connection, parts ...string) (string, error)
4043
// GetConnectionType returns the type of connection for the fileshare

0 commit comments

Comments
 (0)