diff --git a/cmd/tuple/delete.go b/cmd/tuple/delete.go index 9f736bb7..04744e10 100644 --- a/cmd/tuple/delete.go +++ b/cmd/tuple/delete.go @@ -49,6 +49,23 @@ var deleteCmd = &cobra.Command{ } if fileName != "" { startTime := time.Now() + successPath, _ := cmd.Flags().GetString("success-log") + failurePath, _ := cmd.Flags().GetString("failure-log") + var successLogger, failureLogger *tuple.TupleLogger + if successPath != "" { + successLogger, err = tuple.NewTupleLogger(successPath) + if err != nil { + return err + } + defer successLogger.Close() + } + if failurePath != "" { + failureLogger, err = tuple.NewTupleLogger(failurePath) + if err != nil { + return err + } + defer failureLogger.Close() + } clientTupleKeys, err := tuplefile.ReadTupleFile(fileName) if err != nil { @@ -70,8 +87,10 @@ var deleteCmd = &cobra.Command{ Deletes: clientTupleKeyWithoutCondition, } + newCtx := tuple.WithSuccessLogger(cmd.Context(), successLogger) + newCtx = tuple.WithFailureLogger(newCtx, failureLogger) response, err := tuple.ImportTuplesWithoutRampUp( - cmd.Context(), fgaClient, + newCtx, fgaClient, maxTuplesPerWrite, maxParallelRequests, writeRequest) if err != nil { @@ -83,11 +102,11 @@ var deleteCmd = &cobra.Command{ outputResponse := make(map[string]interface{}) - if !hideImportedTuples && len(response.Successful) > 0 { + if !hideImportedTuples && successPath == "" && len(response.Successful) > 0 { outputResponse["successful"] = response.Successful } - if len(response.Failed) > 0 { + if failurePath == "" && len(response.Failed) > 0 { outputResponse["failed"] = response.Failed } diff --git a/cmd/tuple/tuple.go b/cmd/tuple/tuple.go index d5af2254..11595db4 100644 --- a/cmd/tuple/tuple.go +++ b/cmd/tuple/tuple.go @@ -38,6 +38,8 @@ func init() { TupleCmd.AddCommand(deleteCmd) TupleCmd.PersistentFlags().String("store-id", "", "Store ID") + TupleCmd.PersistentFlags().String("success-log", "", "Filepath to log successful writes") + TupleCmd.PersistentFlags().String("failure-log", "", "Filepath to log failed writes") err := TupleCmd.MarkPersistentFlagRequired("store-id") if err != nil { //nolint:wsl diff --git a/cmd/tuple/write.go b/cmd/tuple/write.go index 3b9b969e..2b42529c 100644 --- a/cmd/tuple/write.go +++ b/cmd/tuple/write.go @@ -233,6 +233,26 @@ func writeTuplesFromFile(ctx context.Context, flags *flag.FlagSet, fgaClient *cl return fmt.Errorf("failed to parse debug flag due to %w", err) } + successPath, _ := flags.GetString("success-log") + failurePath, _ := flags.GetString("failure-log") + + var successLogger, failureLogger *tuple.TupleLogger + if successPath != "" { + successLogger, err = tuple.NewTupleLogger(successPath) + if err != nil { + return err + } + defer successLogger.Close() + } + + if failurePath != "" { + failureLogger, err = tuple.NewTupleLogger(failurePath) + if err != nil { + return err + } + defer failureLogger.Close() + } + tuples, err := tuplefile.ReadTupleFile(fileName) if err != nil { return err //nolint:wrapcheck @@ -243,6 +263,8 @@ func writeTuplesFromFile(ctx context.Context, flags *flag.FlagSet, fgaClient *cl } newCtx := utils.WithDebugContext(ctx, debug) + newCtx = tuple.WithSuccessLogger(newCtx, successLogger) + newCtx = tuple.WithFailureLogger(newCtx, failureLogger) response, err := tuple.ImportTuples( newCtx, fgaClient, @@ -257,11 +279,11 @@ func writeTuplesFromFile(ctx context.Context, flags *flag.FlagSet, fgaClient *cl outputResponse := make(map[string]interface{}) - if !hideImportedTuples && len(response.Successful) > 0 { + if !hideImportedTuples && successPath == "" && len(response.Successful) > 0 { outputResponse["successful"] = response.Successful } - if len(response.Failed) > 0 { + if failurePath == "" && len(response.Failed) > 0 { outputResponse["failed"] = response.Failed } diff --git a/internal/tuple/import.go b/internal/tuple/import.go index 4a1c61a5..d5424ae0 100644 --- a/internal/tuple/import.go +++ b/internal/tuple/import.go @@ -140,7 +140,7 @@ func importTuplesWithoutRampUp( return nil, fmt.Errorf("failed to import tuples due to %w", err) } - successful, failed := processWritesAndDeletes(response) + successful, failed := processWritesAndDeletes(ctx, response) result := ImportResponse{ Successful: successful, Failed: failed, @@ -215,8 +215,8 @@ func importTuplesWithRampUp(ctx context.Context, fgaClient client.SdkClient, return err //nolint:wrapcheck } - successfulWrites, failedWrites := processWrites(response.Writes) - successfulDeletes, failedDeletes := processDeletes(response.Deletes) + successfulWrites, failedWrites := processWrites(ctx, response.Writes) + successfulDeletes, failedDeletes := processDeletes(ctx, response.Deletes) mutex.Lock() @@ -298,15 +298,17 @@ func extractErrMsg(err error) string { } func processWritesAndDeletes( + ctx context.Context, response *client.ClientWriteResponse, ) ([]client.ClientTupleKey, []failedWriteResponse) { - successfulWrites, failedWrites := processWrites(response.Writes) - successfulDeletes, failedDeletes := processDeletes(response.Deletes) + successfulWrites, failedWrites := processWrites(ctx, response.Writes) + successfulDeletes, failedDeletes := processDeletes(ctx, response.Deletes) return append(successfulWrites, successfulDeletes...), append(failedWrites, failedDeletes...) } func processWrites( + ctx context.Context, writes []client.ClientWriteRequestWriteResponse, ) ([]client.ClientTupleKey, []failedWriteResponse) { var ( @@ -314,15 +316,25 @@ func processWrites( failedWrites []failedWriteResponse ) + successLogger := getSuccessLogger(ctx) + failureLogger := getFailureLogger(ctx) + for _, write := range writes { if write.Status == client.SUCCESS { successfulWrites = append(successfulWrites, write.TupleKey) + if successLogger != nil { + successLogger.LogSuccess(write.TupleKey) + } } else { reason := extractErrMsg(write.Error) - failedWrites = append(failedWrites, failedWriteResponse{ + failed := failedWriteResponse{ TupleKey: write.TupleKey, Reason: reason, - }) + } + failedWrites = append(failedWrites, failed) + if failureLogger != nil { + failureLogger.LogFailure(write.TupleKey) + } } } @@ -330,6 +342,7 @@ func processWrites( } func processDeletes( + ctx context.Context, deletes []client.ClientWriteRequestDeleteResponse, ) ([]client.ClientTupleKey, []failedWriteResponse) { var ( @@ -337,6 +350,9 @@ func processDeletes( failedDeletes []failedWriteResponse ) + successLogger := getSuccessLogger(ctx) + failureLogger := getFailureLogger(ctx) + for _, del := range deletes { deletedTupleKey := openfga.TupleKey{ Object: del.TupleKey.Object, @@ -346,12 +362,19 @@ func processDeletes( if del.Status == client.SUCCESS { successfulDeletes = append(successfulDeletes, deletedTupleKey) + if successLogger != nil { + successLogger.LogSuccess(deletedTupleKey) + } } else { reason := extractErrMsg(del.Error) - failedDeletes = append(failedDeletes, failedWriteResponse{ + failed := failedWriteResponse{ TupleKey: deletedTupleKey, Reason: reason, - }) + } + failedDeletes = append(failedDeletes, failed) + if failureLogger != nil { + failureLogger.LogFailure(deletedTupleKey) + } } } diff --git a/internal/tuple/import_test.go b/internal/tuple/import_test.go index b73dbf91..a17dabde 100644 --- a/internal/tuple/import_test.go +++ b/internal/tuple/import_test.go @@ -1,6 +1,7 @@ package tuple import ( + "context" "errors" "testing" @@ -23,7 +24,7 @@ func TestProcessWrites(t *testing.T) { }, } - successful, failed := processWrites(writes) + successful, failed := processWrites(context.Background(), writes) assert.Len(t, successful, 1) assert.Len(t, failed, 1) @@ -45,7 +46,7 @@ func TestProcessDeletes(t *testing.T) { }, } - successful, failed := processDeletes(deletes) + successful, failed := processDeletes(context.Background(), deletes) assert.Len(t, successful, 1) assert.Len(t, failed, 1) diff --git a/internal/tuple/logger.go b/internal/tuple/logger.go new file mode 100644 index 00000000..961f116f --- /dev/null +++ b/internal/tuple/logger.go @@ -0,0 +1,183 @@ +package tuple + +import ( + "bufio" + "context" + "encoding/csv" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/openfga/go-sdk/client" + "gopkg.in/yaml.v3" +) + +// TupleLogger writes tuple logs in various formats. +type TupleLogger struct { + file *os.File + writer *bufio.Writer + format string + headerWritten bool +} + +func splitUser(user string) (string, string, string) { + relation := "" + if parts := strings.Split(user, "#"); len(parts) > 1 { + relation = parts[1] + user = parts[0] + } + userParts := strings.SplitN(user, ":", 2) + userType := userParts[0] + userID := "" + if len(userParts) > 1 { + userID = userParts[1] + } + return userType, userID, relation +} + +func splitObject(object string) (string, string) { + parts := strings.SplitN(object, ":", 2) + objectType := parts[0] + objectID := "" + if len(parts) > 1 { + objectID = parts[1] + } + return objectType, objectID +} + +func toCSVRecord(key client.ClientTupleKey) []string { + uType, uID, uRel := splitUser(key.User) + oType, oID := splitObject(key.Object) + + var condName string + var condContext string + if key.Condition != nil { + condName = key.Condition.Name + if key.Condition.Context != nil { + b, _ := json.Marshal(*key.Condition.Context) + condContext = string(b) + } + } + + return []string{uType, uID, uRel, key.Relation, oType, oID, condName, condContext} +} + +// NewTupleLogger creates a logger for the given file path. +func NewTupleLogger(path string) (*TupleLogger, error) { + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600) + if err != nil { + return nil, fmt.Errorf("failed to open log file %s: %w", path, err) + } + + info, err := f.Stat() + if err != nil { + _ = f.Close() + return nil, fmt.Errorf("failed to stat log file %s: %w", path, err) + } + + return &TupleLogger{ + file: f, + writer: bufio.NewWriter(f), + format: strings.ToLower(filepath.Ext(path)), + headerWritten: info.Size() > 0, + }, nil +} + +func (l *TupleLogger) Close() error { + if l == nil { + return nil + } + _ = l.writer.Flush() + _ = l.file.Sync() + return l.file.Close() +} + +func (l *TupleLogger) flush() { + _ = l.writer.Flush() + _ = l.file.Sync() +} + +// LogSuccess writes a successful tuple key. +func (l *TupleLogger) LogSuccess(key client.ClientTupleKey) { + if l == nil { + return + } + switch l.format { + case ".csv": + l.writeCSV(toCSVRecord(key)) + case ".yaml", ".yml": + record := []client.ClientTupleKey{key} + b, _ := yaml.Marshal(record) + l.writer.Write(b) + default: // json and jsonl + b, _ := json.Marshal(key) + l.writer.Write(append(b, '\n')) + } + l.flush() +} + +// LogFailure writes a failed tuple key. +func (l *TupleLogger) LogFailure(key client.ClientTupleKey) { + if l == nil { + return + } + switch l.format { + case ".csv": + l.writeCSV(toCSVRecord(key)) + case ".yaml", ".yml": + record := []client.ClientTupleKey{key} + b, _ := yaml.Marshal(record) + l.writer.Write(b) + default: + b, _ := json.Marshal(key) + l.writer.Write(append(b, '\n')) + } + l.flush() +} + +func (l *TupleLogger) writeCSV(record []string) { + w := csv.NewWriter(l.writer) + if !l.headerWritten { + header := []string{ + "user_type", + "user_id", + "user_relation", + "relation", + "object_type", + "object_id", + "condition_name", + "condition_context", + } + _ = w.Write(header) + l.headerWritten = true + } + _ = w.Write(record) + w.Flush() +} + +// Context utilities + +type logKey struct{ name string } + +var successLogKey = &logKey{"successLog"} +var failureLogKey = &logKey{"failureLog"} + +func WithSuccessLogger(ctx context.Context, l *TupleLogger) context.Context { + return context.WithValue(ctx, successLogKey, l) +} + +func WithFailureLogger(ctx context.Context, l *TupleLogger) context.Context { + return context.WithValue(ctx, failureLogKey, l) +} + +func getSuccessLogger(ctx context.Context) *TupleLogger { + logger, _ := ctx.Value(successLogKey).(*TupleLogger) + return logger +} + +func getFailureLogger(ctx context.Context) *TupleLogger { + logger, _ := ctx.Value(failureLogKey).(*TupleLogger) + return logger +} diff --git a/internal/tuple/logger_test.go b/internal/tuple/logger_test.go new file mode 100644 index 00000000..0aba27ee --- /dev/null +++ b/internal/tuple/logger_test.go @@ -0,0 +1,128 @@ +package tuple + +import ( + "bytes" + "encoding/csv" + "encoding/json" + "os" + "testing" + + openfga "github.com/openfga/go-sdk" + "github.com/openfga/go-sdk/client" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" +) + +func TestTupleLoggerCSVIncludesAllColumns(t *testing.T) { + t.Parallel() + + tmp, err := os.CreateTemp(t.TempDir(), "log*.csv") + require.NoError(t, err) + tmp.Close() + + logger, err := NewTupleLogger(tmp.Name()) + require.NoError(t, err) + defer logger.Close() + + key := client.ClientTupleKey{ + User: "user:anne", + Relation: "viewer", + Object: "document:1", + } + + logger.LogSuccess(key) + require.NoError(t, logger.Close()) + + data, err := os.ReadFile(tmp.Name()) + require.NoError(t, err) + + r := csv.NewReader(bytes.NewReader(data)) + records, err := r.ReadAll() + require.NoError(t, err) + require.Len(t, records, 2) + + header := []string{"user_type", "user_id", "user_relation", "relation", "object_type", "object_id", "condition_name", "condition_context"} + require.Equal(t, header, records[0]) + + expected := []string{"user", "anne", "", "viewer", "document", "1", "", ""} + require.Equal(t, expected, records[1]) +} + +func TestTupleLoggerFormatsSuccessAndFailure(t *testing.T) { + t.Parallel() + + condCtx := map[string]interface{}{"ip_addr": "10.0.0.1"} + key := client.ClientTupleKey{ + User: "user:anne10", + Relation: "owner", + Object: "group:foo", + Condition: &openfga.RelationshipCondition{ + Name: "inOfficeIP", + Context: &condCtx, + }, + } + + formats := []string{".csv", ".json", ".jsonl", ".yaml"} + for _, ext := range formats { + ext := ext + t.Run("success"+ext, func(t *testing.T) { + tmp, err := os.CreateTemp(t.TempDir(), "success*"+ext) + require.NoError(t, err) + tmp.Close() + + logger, err := NewTupleLogger(tmp.Name()) + require.NoError(t, err) + logger.LogSuccess(key) + require.NoError(t, logger.Close()) + + verifyLoggedTuple(t, tmp.Name(), ext, key) + }) + + t.Run("failure"+ext, func(t *testing.T) { + tmp, err := os.CreateTemp(t.TempDir(), "failure*"+ext) + require.NoError(t, err) + tmp.Close() + + logger, err := NewTupleLogger(tmp.Name()) + require.NoError(t, err) + logger.LogFailure(key) + require.NoError(t, logger.Close()) + + verifyLoggedTuple(t, tmp.Name(), ext, key) + }) + } +} + +func verifyLoggedTuple(t *testing.T, path, ext string, expected client.ClientTupleKey) { + t.Helper() + + data, err := os.ReadFile(path) + require.NoError(t, err) + + switch ext { + case ".csv": + r := csv.NewReader(bytes.NewReader(data)) + records, err := r.ReadAll() + require.NoError(t, err) + require.Len(t, records, 2) + header := []string{"user_type", "user_id", "user_relation", "relation", "object_type", "object_id", "condition_name", "condition_context"} + require.Equal(t, header, records[0]) + expectedRow := []string{"user", "anne10", "", "owner", "group", "foo", "inOfficeIP", "{\"ip_addr\":\"10.0.0.1\"}"} + require.Equal(t, expectedRow, records[1]) + case ".json": + var got client.ClientTupleKey + require.NoError(t, json.Unmarshal(data, &got)) + require.Equal(t, expected, got) + case ".jsonl": + var got client.ClientTupleKey + require.NoError(t, json.Unmarshal(bytes.TrimSpace(data), &got)) + require.Equal(t, expected, got) + case ".yaml", ".yml": + var got []client.ClientTupleKey + require.NoError(t, yaml.Unmarshal(data, &got)) + require.Len(t, got, 1) + require.Equal(t, expected, got[0]) + default: + t.Fatalf("unknown extension %s", ext) + } +}