Skip to content

Commit

Permalink
Loop sftp files and move on when theyre missing
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Mar 27, 2024
1 parent a545036 commit fd3c6f7
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ All notable changes to this project will be documented in this file.
### Fixed

- The `unarchive` processor no longer yields linting errors when the format `csv:x` is specified. This is a regression introduced in v4.25.0.
- The `sftp` input will no longer consume files when the watcher cache returns an error. Instead, it will reattempt the file upon the next poll.

## 4.26.0 - 2024-03-18

Expand Down
56 changes: 36 additions & 20 deletions internal/impl/sftp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"os"
"sync"
"time"

Expand Down Expand Up @@ -179,26 +180,39 @@ func (s *sftpReader) Connect(ctx context.Context) (err error) {
}

var nextPath string
if nextPath, err = s.pathProvider.Next(ctx, s.client); err != nil {
if errors.Is(err, sftp.ErrSshFxConnectionLost) {
_ = s.client.Close()
s.client = nil
var file *sftp.File
for {
if nextPath, err = s.pathProvider.Next(ctx, s.client); err != nil {
if errors.Is(err, sftp.ErrSshFxConnectionLost) {
_ = s.client.Close()
s.client = nil
return
}
if errors.Is(err, errEndOfPaths) {
err = service.ErrEndOfInput
}
return
}
if errors.Is(err, errEndOfPaths) {
err = service.ErrEndOfInput
}
return
}

var file *sftp.File
if file, err = s.client.Open(nextPath); err != nil {
if errors.Is(err, sftp.ErrSshFxConnectionLost) {
_ = s.client.Close()
s.client = nil
if file, err = s.client.Open(nextPath); err != nil {
if errors.Is(err, sftp.ErrSshFxConnectionLost) {
_ = s.client.Close()
s.client = nil
}

s.log.With("path", nextPath, "err", err.Error()).Warn("Unable to open previously identified file")
if os.IsNotExist(err) {
// If we failed to open the file because it no longer exists
// then we can "ack" the path as we're done with it.
_ = s.pathProvider.Ack(ctx, nextPath, nil)
} else {
// Otherwise we "nack" it with the error as we'll want to
// reprocess it again later.
_ = s.pathProvider.Ack(ctx, nextPath, err)
}
} else {
break
}
_ = s.pathProvider.Ack(ctx, nextPath, err)
return
}

if s.scanner, err = s.scannerCtor.Create(file, func(ctx context.Context, aErr error) (outErr error) {
Expand Down Expand Up @@ -371,9 +385,11 @@ func (w *watcherPathProvider) Next(ctx context.Context, client *sftp.Client) (st
// We process it if the marker is a pending symbol (!) and we're
// polling for the first time, or if the path isn't found in the
// cache.
// If we got an unexpected error obtaining a marker for this path
// from the cache then we skip that path because the watcher will
// eventually poll again, and the cache.Get operation will re-run.
//
// If we got an unexpected error obtaining a marker for this
// path from the cache then we skip that path because the
// watcher will eventually poll again, and the cache.Get
// operation will re-run.
if v, err := cache.Get(ctx, path); errors.Is(err, service.ErrKeyNotFound) || (!w.followUpPoll && string(v) == "!") {
w.expandedPaths = append(w.expandedPaths, path)
if err = cache.Set(ctx, path, []byte("!"), nil); err != nil {
Expand Down Expand Up @@ -403,7 +419,7 @@ func (w *watcherPathProvider) Ack(ctx context.Context, name string, err error) (
return
}

func (s *sftpReader) getFilePathProvider(ctx context.Context) pathProvider {
func (s *sftpReader) getFilePathProvider(_ context.Context) pathProvider {
if !s.watcherEnabled {
var filepaths []string
for _, p := range s.paths {
Expand Down
3 changes: 2 additions & 1 deletion internal/impl/sftp/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func TestIntegration(t *testing.T) {
Repository: "atmoz/sftp",
Tag: "alpine",
Cmd: []string{
sftpUsername + ":" + sftpPassword + ":1001:100:upload",
// https://github.com/atmoz/sftp/issues/401
"/bin/sh", "-c", "ulimit -n 65535 && exec /entrypoint " + sftpUsername + ":" + sftpPassword + ":1001:100:upload",
},
})
require.NoError(t, err)
Expand Down

0 comments on commit fd3c6f7

Please sign in to comment.