Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
inelpandzic committed Jan 19, 2025
1 parent 817ac6e commit dce27ea
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 16 deletions.
2 changes: 1 addition & 1 deletion log/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (i *iterator) Next() (*Record, error) {
i.page.Read(i.currentPos+intBytesSize, data)

rec := &Record{
length: int(length),
Length: int(length),
Data: data,
}

Expand Down
17 changes: 10 additions & 7 deletions log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func NewLogMgr(fm *file.FileMgr, logFile string) *LogMgr {
}

if logSize == 0 {
// initial offset is the block size
err := logPage.WriteInt(0, fm.BlockSize)
if err != nil {
panic(err)
Expand Down Expand Up @@ -65,7 +66,7 @@ func NewLogMgr(fm *file.FileMgr, logFile string) *LogMgr {
}

// Log logs the record to the log page.
func (lm *LogMgr) Log(record *Record) error {
func (lm *LogMgr) Log(record *Record) (int, error) {
lm.mu.Lock()
defer lm.mu.Unlock()

Expand All @@ -74,11 +75,13 @@ func (lm *LogMgr) Log(record *Record) error {

offset := endian.Uint32(offsetBytes) // offset where the last record starts

// Buffer capacity, the space left in the log page,
// is the offset minus the bytes that hold the offset itself.
buffCap := int(offset) - intBytesSize
if record.totalLength() > buffCap {
err := lm.Flush()
if err != nil {
return err
return 0, err
}

// We can reuse the existing log page and overwrite it with the new data,
Expand All @@ -90,32 +93,32 @@ func (lm *LogMgr) Log(record *Record) error {
lm.currentBlock.Number++
_, err = lm.fm.Write(lm.currentBlock, lm.logPage)
if err != nil {
return fmt.Errorf("failed to write log page: %w", err)
return 0, fmt.Errorf("failed to write log page: %w", err)
}

offset = uint32(lm.fm.BlockSize)
err = lm.logPage.WriteInt(0, int(offset))
if err != nil {
return fmt.Errorf("failed to write new offset: %w", err)
return 0, fmt.Errorf("failed to write new offset: %w", err)
}
}

recPos := int(offset) - record.totalLength()

_, err := lm.logPage.Write(recPos, record.bytes())
if err != nil {
return fmt.Errorf("failed to write record: %w", err)
return 0, fmt.Errorf("failed to write record: %w", err)
}

newOffset := recPos
err = lm.logPage.WriteInt(0, newOffset)
if err != nil {
return fmt.Errorf("failed to write new offset: %w", err)
return 0, fmt.Errorf("failed to write new offset: %w", err)
}

lm.latestLSN++

return nil
return lm.latestLSN, nil
}

// Iterator returns an iterator that iterates over the log records in reverse order.
Expand Down
16 changes: 12 additions & 4 deletions log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,34 +72,42 @@ func TestLog(t *testing.T) {
record *Record
expectedLogSize int
expectedOffset int
expectedLSN int
}{
{
name: "test loging first record",
record: NewRecord([]byte("test record")),
expectedLogSize: 1,
expectedOffset: 17, // 32 (offset before the write) - 15 (4 bytes for length and 11 bytes for data)
expectedLSN: 1,
},
{
name: "test logging second record to be flushed to the same first block",
record: NewRecord([]byte("record 2")),
expectedLogSize: 1,
expectedOffset: 5, // 17 (offset before the write) - 12 (4 bytes for length and 8 bytes for data)
expectedLSN: 2,
},
{
name: "test logging third record to be flushed to the new seconf block",
name: "test logging third record to be flushed to the new second block",
record: NewRecord([]byte("record 3")),
expectedLogSize: 2,
expectedOffset: 20, // 32 (offset before the write) - 12 (4 bytes for length and 8 bytes for data)
expectedOffset: 20, // 32 (new block - offset before the write) - 12 (4 bytes for length and 8 bytes for data)
expectedLSN: 3,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := logMgr.Log(tt.record)
lsn, err := logMgr.Log(tt.record)
if err != nil {
t.Fatalf("Log failed: %v", err)
}

if lsn != tt.expectedLSN {
t.Errorf("lsn = %d, want %d", lsn, tt.expectedLSN)
}

offsetBytes := make([]byte, 4)
logMgr.logPage.Read(0, offsetBytes)
offset := binary.NativeEndian.Uint32(offsetBytes)
Expand Down Expand Up @@ -144,7 +152,7 @@ func TestIterator(t *testing.T) {
}

for _, record := range records {
err := logMgr.Log(record)
_, err := logMgr.Log(record)
if err != nil {
t.Fatalf("Log failed: %v", err)
}
Expand Down
8 changes: 4 additions & 4 deletions log/record.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
package log

type Record struct {
length int
Length int
Data []byte
}

func NewRecord(data []byte) *Record {
return &Record{
length: len(data),
Length: len(data),
Data: data,
}
}

// bytes returns whole record bytes, length 4-byte metadata field plus data.
func (r *Record) bytes() []byte {
lengthBytes := make([]byte, intBytesSize)
endian.PutUint32(lengthBytes, uint32(r.length))
endian.PutUint32(lengthBytes, uint32(r.Length))

return append(lengthBytes, r.Data...)
}

// totalLength returns the total length of the record, including the length 4-byte metadata field.
func (r *Record) totalLength() int {
return intBytesSize + r.length
return intBytesSize + r.Length
}

0 comments on commit dce27ea

Please sign in to comment.