Skip to content

Commit 3e6715d

Browse files
committed
Refactor ctl
1 parent 0030ab8 commit 3e6715d

37 files changed

+783
-675
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
.PHONY: license
1414
build:
15-
go build -v -o bin/function-stream ./cmd
15+
go build -v -o bin/fs ./cmd
1616

1717
build-example:
1818
tinygo build -o bin/example_basic.wasm -target=wasi ./examples/basic

apiclient/apiclient.go

Lines changed: 36 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -359,119 +359,90 @@ type Response struct {
359359
HttpResponse *http.Response
360360
}
361361

362-
func (c *APIClient) FunctionService() *FunctionService {
363-
return &FunctionService{
364-
client: c,
362+
func (c *APIClient) FunctionService() *ResourceService[model.Function] {
363+
return &ResourceService[model.Function]{
364+
Client: c,
365+
BasePath: "/apis/v1/function",
365366
}
366367
}
367368

368-
type FunctionService struct {
369-
client *APIClient
370-
}
371-
372-
func (fs *FunctionService) Deploy(ctx context.Context, f *model.Function) error {
373-
req, err := fs.client.prepareRequest(ctx, "/apis/v1/functions", http.MethodPost, f, nil, nil, nil, nil)
374-
if err != nil {
375-
return err
376-
}
377-
_, err = fs.client.HandleResponse(fs.client.callAPI(req))
378-
return err
379-
}
380-
381-
func (fs *FunctionService) List(ctx context.Context) ([]*model.Function, error) {
382-
req, err := fs.client.prepareRequest(ctx, "/apis/v1/functions", http.MethodGet, nil, nil, nil, nil, nil)
383-
if err != nil {
384-
return nil, err
385-
}
386-
resp, err := fs.client.HandleResponse(fs.client.callAPI(req))
387-
if err != nil {
388-
return nil, err
369+
func (c *APIClient) PackageService() *ResourceService[model.Package] {
370+
return &ResourceService[model.Package]{
371+
Client: c,
372+
BasePath: "/apis/v1/package",
389373
}
390-
var functions []*model.Function
391-
err = json.Unmarshal(resp, &functions)
392-
if err != nil {
393-
return nil, err
394-
}
395-
return functions, nil
396374
}
397375

398-
func (fs *FunctionService) Delete(ctx context.Context, name string) error {
399-
req, err := fs.client.prepareRequest(ctx, "/apis/v1/functions/"+name, http.MethodDelete, nil, nil, nil, nil, nil)
400-
if err != nil {
401-
return err
376+
func (c *APIClient) GenericService(resourceName string) *ResourceService[map[string]any] {
377+
return &ResourceService[map[string]any]{
378+
Client: c,
379+
BasePath: "/apis/v1/" + resourceName,
402380
}
403-
_, err = fs.client.HandleResponse(fs.client.callAPI(req))
404-
return err
405-
}
406-
407-
type PackageService struct {
408-
client *APIClient
409381
}
410382

411-
func (c *APIClient) PackageService() *PackageService {
412-
return &PackageService{
413-
client: c,
414-
}
383+
type ResourceService[T any] struct {
384+
BasePath string
385+
Client *APIClient
415386
}
416387

417-
func (ps *PackageService) Create(ctx context.Context, pkg *model.Package) error {
418-
req, err := ps.client.prepareRequest(ctx, "/apis/v1/packages", http.MethodPost, pkg, nil, nil, nil, nil)
388+
func (rs *ResourceService[T]) Create(ctx context.Context, resource *T) error {
389+
req, err := rs.Client.prepareRequest(ctx, rs.BasePath, http.MethodPost, resource, nil, nil, nil, nil)
419390
if err != nil {
420391
return err
421392
}
422-
_, err = ps.client.HandleResponse(ps.client.callAPI(req))
393+
_, err = rs.Client.HandleResponse(rs.Client.callAPI(req))
423394
return err
424395
}
425396

426-
func (ps *PackageService) List(ctx context.Context) ([]*model.Package, error) {
427-
req, err := ps.client.prepareRequest(ctx, "/apis/v1/packages", http.MethodGet, nil, nil, nil, nil, nil)
397+
func (rs *ResourceService[T]) List(ctx context.Context) ([]*T, error) {
398+
req, err := rs.Client.prepareRequest(ctx, rs.BasePath, http.MethodGet, nil, nil, nil, nil, nil)
428399
if err != nil {
429400
return nil, err
430401
}
431-
resp, err := ps.client.HandleResponse(ps.client.callAPI(req))
402+
resp, err := rs.Client.HandleResponse(rs.Client.callAPI(req))
432403
if err != nil {
433404
return nil, err
434405
}
435-
var packages []*model.Package
436-
err = json.Unmarshal(resp, &packages)
406+
var resources []*T
407+
err = json.Unmarshal(resp, &resources)
437408
if err != nil {
438409
return nil, err
439410
}
440-
return packages, nil
411+
return resources, nil
441412
}
442413

443-
func (ps *PackageService) Read(ctx context.Context, name string) (*model.Package, error) {
444-
req, err := ps.client.prepareRequest(ctx, "/apis/v1/packages/"+name, http.MethodGet, nil, nil, nil, nil, nil)
414+
func (rs *ResourceService[T]) Read(ctx context.Context, name string) (*T, error) {
415+
req, err := rs.Client.prepareRequest(ctx, rs.BasePath+"/"+name, http.MethodGet, nil, nil, nil, nil, nil)
445416
if err != nil {
446417
return nil, err
447418
}
448-
resp, err := ps.client.HandleResponse(ps.client.callAPI(req))
419+
resp, err := rs.Client.HandleResponse(rs.Client.callAPI(req))
449420
if err != nil {
450421
return nil, err
451422
}
452-
var pkg model.Package
453-
err = json.Unmarshal(resp, &pkg)
423+
var resource T
424+
err = json.Unmarshal(resp, &resource)
454425
if err != nil {
455426
return nil, err
456427
}
457-
return &pkg, nil
428+
return &resource, nil
458429
}
459430

460-
func (ps *PackageService) Update(ctx context.Context, pkg *model.Package) error {
461-
req, err := ps.client.prepareRequest(ctx, "/apis/v1/packages/"+pkg.Name, http.MethodPut, pkg, nil, nil, nil, nil)
431+
func (rs *ResourceService[T]) Upsert(ctx context.Context, resource *T) error {
432+
req, err := rs.Client.prepareRequest(ctx, rs.BasePath, http.MethodPut, resource, nil, nil, nil, nil)
462433
if err != nil {
463434
return err
464435
}
465-
_, err = ps.client.HandleResponse(ps.client.callAPI(req))
436+
_, err = rs.Client.HandleResponse(rs.Client.callAPI(req))
466437
return err
467438
}
468439

469-
func (ps *PackageService) Delete(ctx context.Context, name string) error {
470-
req, err := ps.client.prepareRequest(ctx, "/apis/v1/packages/"+name, http.MethodDelete, nil, nil, nil, nil, nil)
440+
func (rs *ResourceService[T]) Delete(ctx context.Context, name string) error {
441+
req, err := rs.Client.prepareRequest(ctx, rs.BasePath+"/"+name, http.MethodDelete, nil, nil, nil, nil, nil)
471442
if err != nil {
472443
return err
473444
}
474-
_, err = ps.client.HandleResponse(ps.client.callAPI(req))
445+
_, err = rs.Client.HandleResponse(rs.Client.callAPI(req))
475446
return err
476447
}
477448

clients/gofs/client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,10 @@ func (c *fsClient) Run(ctx context.Context) error {
272272
select {
273273
case <-ctx.Done():
274274
return ctx.Err()
275-
case e := <-eventChan:
275+
case e, ok := <-eventChan:
276+
if !ok {
277+
return nil
278+
}
276279
switch e.Type {
277280
case rpc.FunctionEventType_DEPLOY:
278281
mf, ok := c.modules[e.Function.Module]

cmd/client/apply.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package client
2+
3+
import (
4+
"fmt"
5+
"github.com/functionstream/function-stream/cmd/client/common"
6+
"github.com/spf13/cobra"
7+
"os"
8+
)
9+
10+
func NewApplyCmd() *cobra.Command {
11+
filePath := ""
12+
cmd := &cobra.Command{
13+
Use: "apply",
14+
Short: "apply resources",
15+
RunE: func(cmd *cobra.Command, args []string) error {
16+
if filePath == "" {
17+
return fmt.Errorf("file is required")
18+
}
19+
data, err := os.ReadFile(filePath)
20+
if err != nil {
21+
return fmt.Errorf("failed to read file: %w", err)
22+
}
23+
cmd.SilenceUsage = true
24+
resources, err := DecodeResource(data)
25+
if err != nil {
26+
return fmt.Errorf("failed to decode resources: %w", err)
27+
}
28+
29+
// TODO: support dry run
30+
31+
apiCLi := common.GetApiClient()
32+
33+
hasErr := false
34+
for _, r := range resources {
35+
if err := apiCLi.GenericService(r.Kind).Upsert(cmd.Context(), &r.Spec); err != nil {
36+
fmt.Printf("Failed to apply resource %s: %v\n", r.Metadata.Name, err)
37+
hasErr = true
38+
continue
39+
}
40+
fmt.Printf("Resource %s/%s applied\n", r.Kind, r.Metadata.Name)
41+
}
42+
if hasErr {
43+
return fmt.Errorf("some resources failed to apply")
44+
}
45+
return nil
46+
},
47+
}
48+
cmd.Flags().StringVarP(&filePath, "file", "f", "", "The path to the resources")
49+
return cmd
50+
}

cmd/client/cmd.go

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,6 @@ package client
1818

1919
import (
2020
c "github.com/functionstream/function-stream/cmd/client/common"
21-
"github.com/functionstream/function-stream/cmd/client/consume"
22-
"github.com/functionstream/function-stream/cmd/client/create"
23-
del "github.com/functionstream/function-stream/cmd/client/delete"
24-
"github.com/functionstream/function-stream/cmd/client/list"
25-
"github.com/functionstream/function-stream/cmd/client/produce"
26-
"github.com/functionstream/function-stream/cmd/client/reload"
2721
"github.com/spf13/cobra"
2822
)
2923

@@ -35,14 +29,18 @@ var (
3529
}
3630
)
3731

38-
func init() {
39-
Cmd.PersistentFlags().StringVarP(&c.Config.ServiceAddr, "service-address", "s",
40-
"http://localhost:7300", "Service address")
41-
42-
Cmd.AddCommand(create.Cmd)
43-
Cmd.AddCommand(list.Cmd)
44-
Cmd.AddCommand(del.Cmd)
45-
Cmd.AddCommand(produce.Cmd)
46-
Cmd.AddCommand(consume.Cmd)
47-
Cmd.AddCommand(reload.Cmd)
32+
func NewClientCmd() *cobra.Command {
33+
cmd := &cobra.Command{
34+
Use: "client",
35+
Short: "Function Stream Client Tool",
36+
Long: `Operations to manage functions in a function stream server`,
37+
}
38+
cmd.PersistentFlags().StringVarP(&c.Config.ServiceAddr, "service-address", "s",
39+
"localhost:7300", "Service address")
40+
cmd.AddCommand(NewCreateCmd())
41+
cmd.AddCommand(NewGetCommand())
42+
cmd.AddCommand(NewListCommand())
43+
cmd.AddCommand(NewDeleteCommand())
44+
cmd.AddCommand(NewApplyCmd())
45+
return cmd
4846
}

cmd/client/common/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,17 @@
1616

1717
package common
1818

19+
import "github.com/functionstream/function-stream/apiclient"
20+
1921
type ClientConfig struct {
2022
ServiceAddr string
2123
}
2224

2325
var Config ClientConfig
26+
27+
func GetApiClient() *apiclient.APIClient {
28+
apiCliCfg := apiclient.NewConfiguration()
29+
apiCliCfg.Debug = true // TODO: support configuring debug
30+
apiCliCfg.Host = Config.ServiceAddr
31+
return apiclient.NewAPIClient(apiCliCfg)
32+
}

cmd/client/create.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package client
2+
3+
import (
4+
"fmt"
5+
"github.com/functionstream/function-stream/cmd/client/common"
6+
"github.com/spf13/cobra"
7+
"os"
8+
)
9+
10+
func NewCreateCmd() *cobra.Command {
11+
filePath := ""
12+
cmd := &cobra.Command{
13+
Use: "create",
14+
Short: "Create resources",
15+
RunE: func(cmd *cobra.Command, args []string) error {
16+
if filePath == "" {
17+
return fmt.Errorf("file is required")
18+
}
19+
data, err := os.ReadFile(filePath)
20+
if err != nil {
21+
return fmt.Errorf("failed to read file: %w", err)
22+
}
23+
cmd.SilenceUsage = true
24+
resources, err := DecodeResource(data)
25+
if err != nil {
26+
return fmt.Errorf("failed to decode resources: %w", err)
27+
}
28+
29+
// TODO: support dry run
30+
31+
apiCLi := common.GetApiClient()
32+
33+
hasErr := false
34+
for _, r := range resources {
35+
if err := apiCLi.GenericService(r.Kind).Create(cmd.Context(), &r.Spec); err != nil {
36+
fmt.Printf("Failed to create resource %s: %v\n", r.Metadata.Name, err)
37+
hasErr = true
38+
continue
39+
}
40+
fmt.Printf("Resource %s/%s created\n", r.Kind, r.Metadata.Name)
41+
}
42+
if hasErr {
43+
return fmt.Errorf("some resources failed to create")
44+
}
45+
return nil
46+
},
47+
}
48+
cmd.Flags().StringVarP(&filePath, "file", "f", "", "The path to the resources")
49+
return cmd
50+
}

cmd/client/delete.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package client
2+
3+
import (
4+
"fmt"
5+
"github.com/functionstream/function-stream/cmd/client/common"
6+
"github.com/spf13/cobra"
7+
)
8+
9+
func NewDeleteCommand() *cobra.Command {
10+
cmd := &cobra.Command{
11+
Use: "delete",
12+
Short: "delete resources",
13+
RunE: func(cmd *cobra.Command, args []string) error {
14+
res := args[0]
15+
name := args[1]
16+
apiCLi := common.GetApiClient()
17+
18+
err := apiCLi.GenericService(res).Delete(cmd.Context(), name)
19+
if err != nil {
20+
return err
21+
}
22+
fmt.Printf("Resource %s/%s deleted\n", res, name)
23+
return nil
24+
},
25+
}
26+
cmd.Args = cobra.ExactArgs(2)
27+
return cmd
28+
}

0 commit comments

Comments
 (0)