Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ func CloneWithTmpDir(dir string) CloneOpt {
}
}

func CloneWithNodeTmpPath(dir string) CloneOpt {
return func(p *BaseProviders) {
p.params.NodeTmpPath = dir
}
}

func (b *BaseProviders) Clone(opts ...CloneOpt) *BaseProviders {
clone := *b

Expand Down
20 changes: 16 additions & 4 deletions pkg/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,17 @@ type File interface {
DownloadBytes(ctx context.Context, remotePath string) ([]byte, error)
}

// BundlerShouldInfoOutChecker
// if checker return empty string - output as Debug
// otherwise output with Info
type BundlerShouldInfoOutChecker func(string) string

type BundlerOptions struct {
StepHeaderRegex *regexp.Regexp
NoLogStepOutOnError bool
StepsDelimiter string
Retries int
StepHeaderRegex *regexp.Regexp
ShouldInfoOutChecker BundlerShouldInfoOutChecker
NoLogStepOutOnError bool
StepsDelimiter string
Retries int
}

func (o *BundlerOptions) IsValid() error {
Expand All @@ -147,6 +153,12 @@ func BundlerWithStepHeaderRegex(regex *regexp.Regexp) BundlerOption {
}
}

func BundlerWithShouldInfoOutChecker(checker BundlerShouldInfoOutChecker) BundlerOption {
return func(opts *BundlerOptions) {
opts.ShouldInfoOutChecker = checker
}
}

func BundlerWithNoLogStepOutOnError(v bool) BundlerOption {
return func(opts *BundlerOptions) {
opts.NoLogStepOutOnError = v
Expand Down
10 changes: 1 addition & 9 deletions pkg/ssh/clissh/upload-script.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,4 @@ func commandKiller(command connection.Command) {
_ = cliCmd.cmd.Process.Kill()
}

func commandPreparator(command connection.Command) {
cliCommand, ok := command.(*Command)
if !ok {
return
}

cliCommand.CaptureStdout(nil)
cliCommand.CaptureStderr(nil)
}
func commandPreparator(command connection.Command) {}
10 changes: 1 addition & 9 deletions pkg/ssh/gossh/upload-script.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,4 @@ func commandKiller(command connection.Command) {
_ = goCommand.session.Close()
}

func commandPreparator(command connection.Command) {
goCommand, ok := command.(*SSHCommand)
if !ok {
return
}

goCommand.CaptureStdout(nil)
goCommand.CaptureStderr(nil)
}
func commandPreparator(command connection.Command) {}
88 changes: 55 additions & 33 deletions pkg/ssh/local/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type Command struct {

used atomic.Bool

cmdMu sync.RWMutex
cmd *exec.Cmd

program string
args []string
sudo bool
Expand Down Expand Up @@ -89,10 +92,13 @@ func (c *Command) Run(ctx context.Context) error {
if err = cmd.Start(); err != nil {
return fmt.Errorf("cmd start failed: %v", err)
}

if c.onStart != nil {
c.onStart()
}

c.setCmd(cmd)

wg.Wait() // Wait for stdout/stderr reads to complete first
c.stdout = stdoutBuf.Bytes()
c.stderr = stderrBuf.Bytes()
Expand Down Expand Up @@ -174,38 +180,6 @@ func (c *Command) CombinedOutput(ctx context.Context) ([]byte, error) {
return output.Bytes(), nil
}

func (c *Command) prepareCmd(ctx context.Context) (*exec.Cmd, context.CancelFunc) {
bashBuiltins := []string{"bind", "type", "command", "let", "mapfile", "printf", "readarray", "ulimit"}

program := c.program
args := c.args
if c.sudo {
program = "sudo"
args = append([]string{c.program}, c.args...)
} else if slices.Contains(bashBuiltins, program) { // For shell built-in things we need to run bash
program = "bash"
args = []string{"-c", strings.Join(append([]string{c.program}, c.args...), " ")}
}

ctx, cancel := context.WithCancel(ctx)
if c.timeout > 0 {
cancel()
ctx, cancel = context.WithTimeout(ctx, c.timeout)
}

cmd := exec.CommandContext(ctx, program, args...)
if len(c.env) > 0 {
cmd.Env = os.Environ()
for k, v := range c.env {
cmd.Env = append(cmd.Env, k+"="+v)
}
}

c.settings.Logger().DebugF("Command prepared: %#v\n", cmd)

return cmd, cancel
}

func (c *Command) Sudo(_ context.Context) {
c.sudo = true
}
Expand Down Expand Up @@ -235,6 +209,54 @@ func (c *Command) StderrBytes() []byte {
}

// The rest are no-ops for local execution
func (c *Command) Cmd(_ context.Context) {}

func (c *Command) Cmd(_ context.Context) {}
func (c *Command) WithSSHArgs(_ ...string) {}

func (c *Command) setCmd(cmd *exec.Cmd) {
c.cmdMu.Lock()
defer c.cmdMu.Unlock()

c.cmd = cmd
}

func (c *Command) getCmd() *exec.Cmd {
c.cmdMu.RLock()
defer c.cmdMu.RUnlock()

return c.cmd
}

func (c *Command) prepareCmd(ctx context.Context) (*exec.Cmd, context.CancelFunc) {
bashBuiltins := []string{"bind", "type", "command", "let", "mapfile", "printf", "readarray", "ulimit"}

program := c.program
args := c.args
if c.sudo {
program = "sudo"
args = append([]string{c.program}, c.args...)
} else if slices.Contains(bashBuiltins, program) { // For shell built-in things we need to run bash
program = "bash"
args = []string{"-c", strings.Join(append([]string{c.program}, c.args...), " ")}
}

var cancel context.CancelFunc

if c.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, c.timeout)
} else {
ctx, cancel = context.WithCancel(ctx)
}

cmd := exec.CommandContext(ctx, program, args...)
if len(c.env) > 0 {
cmd.Env = os.Environ()
for k, v := range c.env {
cmd.Env = append(cmd.Env, k+"="+v)
}
}

c.settings.Logger().DebugF("Command prepared: %#v\n", cmd)

return cmd, cancel
}
2 changes: 1 addition & 1 deletion pkg/ssh/local/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ func (n *NodeInterface) UploadScript(scriptPath string, args ...string) connecti
logger.DebugF("Starting NodeInterface.UploadScript")
defer logger.DebugF("Stop NodeInterface.UploadScript")

return NewScript(n.settings, scriptPath, args...)
return NewScript(n, scriptPath, args...)
}
123 changes: 97 additions & 26 deletions pkg/ssh/local/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
"path/filepath"
"time"

"github.com/name212/govalue"

connection "github.com/deckhouse/lib-connection/pkg"
"github.com/deckhouse/lib-connection/pkg/settings"
"github.com/deckhouse/lib-connection/pkg/ssh/utils"
)

var (
Expand All @@ -34,20 +37,30 @@ var (
type Script struct {
settings settings.Settings

node *NodeInterface
scriptPath string
args []string
env map[string]string
sudo bool
stdoutLineHandler func(line string)
timeout time.Duration
cleanupAfterRun bool
bundlerOptions []connection.BundlerOption
noOutError bool
// bundleDest
// for test purposes
bundleDest string
// forceBundleNoSudo
// for test purposes
forceBundleNoSudo bool
}

func NewScript(sett settings.Settings, path string, args ...string) *Script {
func NewScript(node *NodeInterface, path string, args ...string) *Script {
return &Script{
node: node,
scriptPath: path,
args: args,
settings: sett,
settings: node.settings,
}
}

Expand Down Expand Up @@ -84,36 +97,30 @@ func (s *Script) Execute(ctx context.Context) ([]byte, error) {
return cmd.StdoutBytes(), nil
}

func (s *Script) WithBundlerOpts(opts ...connection.BundlerOption) {}
func (s *Script) WithBundlerOpts(opts ...connection.BundlerOption) {
s.bundlerOptions = append(make([]connection.BundlerOption, 0), opts...)
}

func (s *Script) ExecuteBundle(ctx context.Context, parentDir, bundleDir string) ([]byte, error) {
srcPath := filepath.Join(parentDir, bundleDir)
dstPath := filepath.Join("/var/lib/", bundleDir)
_ = os.RemoveAll(dstPath) // Cleanup from previous runs
if err := copyRecursively(srcPath, dstPath); err != nil {
return nil, fmt.Errorf("copy bundle to /var/lib/%s: %w", bundleDir, err)
opts, err := utils.UserBundleOptsOrBashible(s.bundlerOptions...)
if err != nil {
return nil, err
}

cmd := NewCommand(s.settings, filepath.Join("/var/lib", bundleDir, s.scriptPath), s.args...)
if s.timeout > 0 {
cmd.WithTimeout(s.timeout)
}
if s.env != nil {
cmd.WithEnv(s.env)
}
if s.stdoutLineHandler != nil {
cmd.WithStdoutHandler(s.stdoutLineHandler)
}
if s.sudo {
cmd.Sudo(ctx)
}
opts = append(opts,
utils.BundleWithNoLogStepOutOnError(s.noOutError),
utils.BundleWithCommandKiller(commandKiller),
utils.BundleWithCommandPreparator(s.commandPreparator),
)

if err := cmd.Run(ctx); err != nil {
s.settings.Logger().DebugF("Execute bundle failed: stdout: %s\n\nstderr: %s\n", cmd.StdoutBytes(), cmd.StderrBytes())
return nil, fmt.Errorf("Execute bundle failed: %w", err)
bundler, err := utils.NewBundle(s.settings, s.node, s.scriptPath, s.args, opts...)
if err != nil {
return nil, err
}

return cmd.StdoutBytes(), nil
bundler.WithCmdProvider(s.bundleCmdProvider)

return bundler.Execute(ctx, parentDir, bundleDir)
}

func (s *Script) Sudo() {
Expand All @@ -136,6 +143,70 @@ func (s *Script) WithCleanupAfterExec(doCleanup bool) {
s.cleanupAfterRun = doCleanup
}

func (s *Script) WithNoLogStepOutOnError(bool) {}
func (s *Script) WithNoLogStepOutOnError(f bool) {
s.noOutError = f
}

func (s *Script) WithExecuteUploadDir(string) {}

func (s *Script) bundleCmdProvider(ctx context.Context, node connection.Interface, parentDir, bundleDir string) (connection.Command, error) {
fullDest := s.getBundleFullDest(bundleDir)

srcPath := filepath.Join(parentDir, bundleDir)
_ = os.RemoveAll(fullDest) // Cleanup from previous runs
if err := copyRecursively(srcPath, fullDest); err != nil {
return nil, fmt.Errorf("copy bundle to %s: %w", fullDest, err)
}

cmd := NewCommand(s.settings, filepath.Join(fullDest, s.scriptPath), s.args...)

if s.timeout > 0 {
cmd.WithTimeout(s.timeout)
}

if s.env != nil {
cmd.WithEnv(s.env)
}

return cmd, nil
}

func (s *Script) getBundleFullDest(bundleDir string) string {
dest := "/var/lib"
if s.bundleDest != "" {
dest = s.bundleDest
}

return filepath.Join(dest, bundleDir)
}

func (s *Script) commandPreparator(command connection.Command) {
if !s.forceBundleNoSudo {
return
}

localCmd, ok := command.(*Command)
if !ok {
return
}

localCmd.sudo = false
}

func commandKiller(command connection.Command) {
localCmd, ok := command.(*Command)
if !ok {
return
}

cmd := localCmd.getCmd()
if govalue.Nil(cmd) {
return
}

if !govalue.Nil(cmd.ProcessState) && cmd.ProcessState.Exited() {
return
}

_ = cmd.Process.Kill()
}
Loading
Loading