Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - activation: panic if post service exits unexpectedly #5300

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ jobs:
- name: Add OpenCL support - Ubuntu
if: ${{ matrix.os == 'ubuntu-latest' }}
run: sudo apt-get update -q && sudo apt-get install -qy ocl-icd-opencl-dev libpocl2
- name: Override SDKROOT for macOS
if: ${{ contains(matrix.os, 'macos') && runner.arch == 'arm64' }}
run: echo "SDKROOT=/Library/Developer/CommandLineTools/SDKs/MacOSX12.3.sdk" >> $GITHUB_ENV
- name: disable Windows Defender - Windows
if: ${{ matrix.os == 'windows-latest' }}
run: |
Expand Down Expand Up @@ -182,9 +179,6 @@ jobs:
- name: Add OpenCL support - Ubuntu
if: ${{ matrix.os == 'ubuntu-latest' }}
run: sudo apt-get update -q && sudo apt-get install -qy ocl-icd-opencl-dev libpocl2
- name: Override SDKROOT for macOS
if: ${{ contains(matrix.os, 'macos') && runner.arch == 'arm64' }}
run: echo "SDKROOT=/Library/Developer/CommandLineTools/SDKs/MacOSX12.3.sdk" >> $GITHUB_ENV
- name: Add OpenCL support - Windows
if: ${{ matrix.os == 'windows-latest' }}
run: choco install opencl-intel-cpu-runtime
Expand Down
3 changes: 0 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ jobs:
- name: Add OpenCL support - Ubuntu
if: ${{ matrix.os == 'ubuntu-latest' }}
run: sudo apt-get update -q && sudo apt-get install -qy ocl-icd-opencl-dev libpocl2
- name: Override SDKROOT for macOS
if: ${{ contains(matrix.os, 'macos') && runner.arch == 'arm64' }}
run: echo "SDKROOT=/Library/Developer/CommandLineTools/SDKs/MacOSX12.3.sdk" >> $GITHUB_ENV
- name: disable Windows Defender - Windows
if: ${{ matrix.os == 'windows-latest' }}
run: |
Expand Down
2 changes: 1 addition & 1 deletion Makefile-libs.Inc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ else
endif
endif

POSTRS_SETUP_REV = 0.5.2
POSTRS_SETUP_REV = 0.6.1
POSTRS_SETUP_ZIP = libpost-$(platform)-v$(POSTRS_SETUP_REV).zip
POSTRS_SETUP_URL_ZIP ?= https://github.com/spacemeshos/post-rs/releases/download/v$(POSTRS_SETUP_REV)/$(POSTRS_SETUP_ZIP)
POSTRS_PROFILER_ZIP = profiler-$(platform)-v$(POSTRS_SETUP_REV).zip
Expand Down
148 changes: 73 additions & 75 deletions activation/post_supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
return PostSupervisorConfig{
PostServiceCmd: filepath.Join(filepath.Dir(path), DefaultPostServiceName),
NodeAddress: "http://127.0.0.1:9093",
MaxRetries: 10,
}
}

Expand All @@ -44,12 +45,14 @@
return PostSupervisorConfig{
PostServiceCmd: filepath.Join(filepath.Dir(string(path)), "build", DefaultPostServiceName),
NodeAddress: "http://127.0.0.1:9093",
MaxRetries: 10,
}
}

type PostSupervisorConfig struct {
PostServiceCmd string
NodeAddress string
MaxRetries int

CACert string
Cert string
Expand Down Expand Up @@ -167,8 +170,7 @@
return err
}

ps.eg.Go(func() error { return ps.runCmd(ctx, ps.cmdCfg, ps.postCfg, opts, ps.provingOpts) })
return nil
return ps.runCmd(ctx, ps.cmdCfg, ps.postCfg, opts, ps.provingOpts)
})
return nil
}
Expand Down Expand Up @@ -206,22 +208,13 @@
// it returns when the pipe is closed.
func (ps *PostSupervisor) captureCmdOutput(pipe io.ReadCloser) func() error {
return func() error {
buf := bufio.NewReader(pipe)
for {
line, err := buf.ReadString('\n')
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
line := scanner.Text()
line = strings.TrimRight(line, "\r\n") // remove line delimiters at end of input
switch err {
case nil:
ps.logger.Info(line)
case io.EOF:
ps.logger.Info(line)
return nil
default:
ps.logger.Info(line)
ps.logger.Warn("read from post service pipe", zap.Error(err))
return nil
}
ps.logger.Info(line)
}
return nil
}
}

Expand All @@ -232,67 +225,72 @@
postOpts PostSetupOpts,
provingOpts PostProvingOpts,
) error {
for {
args := []string{
"--address", cmdCfg.NodeAddress,

"--k1", strconv.FormatUint(uint64(postCfg.K1), 10),
"--k2", strconv.FormatUint(uint64(postCfg.K2), 10),
"--k3", strconv.FormatUint(uint64(postCfg.K3), 10),
"--pow-difficulty", postCfg.PowDifficulty.String(),

"--dir", postOpts.DataDir,
"-n", strconv.FormatUint(uint64(postOpts.Scrypt.N), 10),
"-r", strconv.FormatUint(uint64(postOpts.Scrypt.R), 10),
"-p", strconv.FormatUint(uint64(postOpts.Scrypt.P), 10),

"--threads", strconv.FormatUint(uint64(provingOpts.Threads), 10),
"--nonces", strconv.FormatUint(uint64(provingOpts.Nonces), 10),
"--randomx-mode", provingOpts.RandomXMode.String(),

"--watch-pid", strconv.Itoa(os.Getpid()),
}
if cmdCfg.CACert != "" {
args = append(args, "--ca-cert", cmdCfg.CACert)
}
if cmdCfg.Cert != "" {
args = append(args, "--cert", cmdCfg.Cert)
}
if cmdCfg.Key != "" {
args = append(args, "--key", cmdCfg.Key)
}
args := []string{
"--address", cmdCfg.NodeAddress,

"--min-num-units", strconv.FormatUint(uint64(postCfg.MinNumUnits), 10),
"--max-num-units", strconv.FormatUint(uint64(postCfg.MaxNumUnits), 10),
"--labels-per-unit", strconv.FormatUint(uint64(postCfg.LabelsPerUnit), 10),
"--k1", strconv.FormatUint(uint64(postCfg.K1), 10),
"--k2", strconv.FormatUint(uint64(postCfg.K2), 10),
"--k3", strconv.FormatUint(uint64(postCfg.K3), 10),
"--pow-difficulty", postCfg.PowDifficulty.String(),

"--dir", postOpts.DataDir,
"-n", strconv.FormatUint(uint64(postOpts.Scrypt.N), 10),
"-r", strconv.FormatUint(uint64(postOpts.Scrypt.R), 10),
"-p", strconv.FormatUint(uint64(postOpts.Scrypt.P), 10),

"--threads", strconv.FormatUint(uint64(provingOpts.Threads), 10),
"--nonces", strconv.FormatUint(uint64(provingOpts.Nonces), 10),
"--randomx-mode", provingOpts.RandomXMode.String(),

"--watch-pid", strconv.Itoa(os.Getpid()),
}
if cmdCfg.MaxRetries > 0 {
args = append(args, "--max-retries", strconv.Itoa(cmdCfg.MaxRetries))
}
if cmdCfg.CACert != "" {
args = append(args, "--ca-cert", cmdCfg.CACert)
}
if cmdCfg.Cert != "" {
args = append(args, "--cert", cmdCfg.Cert)
}
if cmdCfg.Key != "" {
args = append(args, "--key", cmdCfg.Key)
}

cmd := exec.CommandContext(
ctx,
cmdCfg.PostServiceCmd,
args...,
)
cmd.Dir = filepath.Dir(cmdCfg.PostServiceCmd)
pipe, err := cmd.StderrPipe()
if err != nil {
ps.logger.Error("setup stderr pipe for post service", zap.Error(err))
return nil
}
cmd := exec.CommandContext(
ctx,
cmdCfg.PostServiceCmd,
args...,
)
cmd.Dir = filepath.Dir(cmdCfg.PostServiceCmd)
pipe, err := cmd.StderrPipe()
if err != nil {
ps.logger.Error("setup stderr pipe for post service", zap.Error(err))
return nil
}

Check warning on line 273 in activation/post_supervisor.go

View check run for this annotation

Codecov / codecov/patch

activation/post_supervisor.go#L271-L273

Added lines #L271 - L273 were not covered by tests

var eg errgroup.Group
eg.Go(ps.captureCmdOutput(pipe))
if err := cmd.Start(); err != nil {
pipe.Close()
ps.logger.Error("start post service", zap.Error(err))
return nil
}
ps.logger.Info("post service started", zap.Int("pid", cmd.Process.Pid), zap.String("cmd", cmd.String()))
ps.pid.Store(int64(cmd.Process.Pid))
events.EmitPostServiceStarted()
err = cmd.Wait()
if err := ctx.Err(); err != nil {
events.EmitPostServiceStopped()
if err := eg.Wait(); err != nil {
ps.logger.Warn("output reading goroutine failed", zap.Error(err))
}
return nil
var eg errgroup.Group
eg.Go(ps.captureCmdOutput(pipe))
if err := cmd.Start(); err != nil {
pipe.Close()
ps.logger.Error("start post service", zap.Error(err))
return nil
}

Check warning on line 281 in activation/post_supervisor.go

View check run for this annotation

Codecov / codecov/patch

activation/post_supervisor.go#L278-L281

Added lines #L278 - L281 were not covered by tests
ps.logger.Info("post service started", zap.Int("pid", cmd.Process.Pid), zap.String("cmd", cmd.String()))
ps.pid.Store(int64(cmd.Process.Pid))
events.EmitPostServiceStarted()
err = cmd.Wait()
if ctx.Err() != nil {
events.EmitPostServiceStopped()
if err := eg.Wait(); err != nil {
ps.logger.Warn("output reading goroutine failed", zap.Error(err))

Check warning on line 289 in activation/post_supervisor.go

View check run for this annotation

Codecov / codecov/patch

activation/post_supervisor.go#L289

Added line #L289 was not covered by tests
}
ps.logger.Warn("post service exited", zap.Error(err))
eg.Wait()
return nil
}
eg.Wait()
ps.logger.Fatal("post service exited", zap.Error(err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it makes more sense to raise Fatal in node when error will bubble up. there are several examples of this, for example peersync process

Copy link
Member Author

@fasmat fasmat Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean just returning an error here and handle the error with log.Fatal in node.go?

The problem is that the error here happens asynchronously and doesn't bubble up at the moment. Returning it here would do nothing (besides preventing the internal errgroup from accepting new go routines).

return nil
}
45 changes: 37 additions & 8 deletions activation/post_supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ func Test_PostSupervisor_Restart_Possible(t *testing.T) {
require.Eventually(t, func() bool { return (ps.pid.Load() == 0) }, 5*time.Second, 100*time.Millisecond)
}

func Test_PostSupervisor_RestartsOnCrash(t *testing.T) {
log := zaptest.NewLogger(t)
func Test_PostSupervisor_LogFatalOnCrash(t *testing.T) {
log := zaptest.NewLogger(t, zaptest.WrapOptions(zap.WithFatalHook(calledFatal(t))))

cmdCfg := DefaultTestPostServiceConfig()
postCfg := DefaultPostConfig()
Expand All @@ -209,21 +209,50 @@ func Test_PostSupervisor_RestartsOnCrash(t *testing.T) {

require.Eventually(t, func() bool { return (ps.pid.Load() != 0) }, 5*time.Second, 100*time.Millisecond)

oldPid := int(ps.pid.Load())
process, err := os.FindProcess(oldPid)
pid := int(ps.pid.Load())
process, err := os.FindProcess(pid)
require.NoError(t, err)
require.NotNil(t, process)
require.NoError(t, process.Kill())

require.Eventually(t, func() bool { return (ps.pid.Load() != int64(oldPid)) }, 5*time.Second, 100*time.Millisecond)
// log asserts that zap.Fatal was called
require.NoError(t, ps.eg.Wait())
}

func Test_PostSupervisor_LogFatalOnInvalidConfig(t *testing.T) {
log := zaptest.NewLogger(t, zaptest.WrapOptions(zap.WithFatalHook(calledFatal(t))))

cmdCfg := DefaultTestPostServiceConfig()
cmdCfg.NodeAddress = "http://127.0.0.1:9099" // wrong port
cmdCfg.MaxRetries = 1 // speedup test, will fail on 2nd retry (~ 5s)
postCfg := DefaultPostConfig()
postOpts := DefaultPostSetupOpts()
provingOpts := DefaultPostProvingOpts()

ctrl := gomock.NewController(t)
mgr := NewMockpostSetupProvider(ctrl)
mgr.EXPECT().PrepareInitializer(postOpts).Return(nil)
mgr.EXPECT().StartSession(gomock.Any()).Return(nil)

sync := NewMocksyncer(ctrl)
sync.EXPECT().RegisterForATXSynced().DoAndReturn(closedChan)

ps, err := NewPostSupervisor(log.Named("supervisor"), cmdCfg, postCfg, provingOpts, mgr, sync)
require.NoError(t, err)
require.NotNil(t, ps)

require.NoError(t, ps.Start(postOpts))
t.Cleanup(func() { assert.NoError(t, ps.Stop(false)) })

require.Eventually(t, func() bool { return (ps.pid.Load() != 0) }, 5*time.Second, 100*time.Millisecond)

pid := int(ps.pid.Load())
process, err = os.FindProcess(pid)
process, err := os.FindProcess(pid)
require.NoError(t, err)
require.NotNil(t, process)

require.NotEqual(t, oldPid, pid)
require.NoError(t, ps.Stop(false))
// log asserts that zap.Fatal was called
require.NoError(t, ps.eg.Wait())
}

func Test_PostSupervisor_StopOnError(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/spacemeshos/go-scale v1.1.12
github.com/spacemeshos/merkle-tree v0.2.3
github.com/spacemeshos/poet v0.9.7
github.com/spacemeshos/post v0.10.1
github.com/spacemeshos/post v0.10.2
github.com/spf13/afero v1.10.0
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,8 @@ github.com/spacemeshos/merkle-tree v0.2.3 h1:zGEgOR9nxAzJr0EWjD39QFngwFEOxfxMloE
github.com/spacemeshos/merkle-tree v0.2.3/go.mod h1:VomOcQ5pCBXz7goiWMP5hReyqOfDXGSKbrH2GB9Htww=
github.com/spacemeshos/poet v0.9.7 h1:FmKhgUKj//8Tzn8czWSIrn6+FVUFZbvLh8zqLfB8dfE=
github.com/spacemeshos/poet v0.9.7/go.mod h1:wGCdhs2jnfQ52Amcmygv9uEEwYpdHAPjbiPg0Uf6cNQ=
github.com/spacemeshos/post v0.10.1 h1:dFWB1xHa4Z+mDqmZjlQxBVn0e5gBs4QS/j50WXfHoac=
github.com/spacemeshos/post v0.10.1/go.mod h1:FPa130ioHforcqca0vqJrYjgUsYzTBwivy6lyLy7sKA=
github.com/spacemeshos/post v0.10.2 h1:FjRbrceno8xjBZi+53a+Xjoy81XormxDBvN0mN+0wxg=
github.com/spacemeshos/post v0.10.2/go.mod h1:lSZOkvCna1UuXgdGrvc+2SPMM+f+IUi9whZpBH7wUbM=
github.com/spacemeshos/sha256-simd v0.1.0 h1:G7Mfu5RYdQiuE+wu4ZyJ7I0TI74uqLhFnKblEnSpjYI=
github.com/spacemeshos/sha256-simd v0.1.0/go.mod h1:O8CClVIilId7RtuCMV2+YzMj6qjVn75JsxOxaE8vcfM=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
Expand Down