Skip to content

Commit

Permalink
support deallocated page reuse after relaunch: existing testcases pas…
Browse files Browse the repository at this point in the history
…sed (several logging needed points are not implemented yet and modification specific testcases does not exist)
  • Loading branch information
ryogrid committed Jun 22, 2024
1 parent d664531 commit ded563f
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 15 deletions.
9 changes: 7 additions & 2 deletions lib/recovery/log_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ func (log_manager *LogManager) AppendLogRecord(log_record *LogRecord) types.LSN
log_manager.latch.WLock()
copy(log_manager.log_buffer[log_manager.offset:], log_record.GetLogHeaderData())
}
log_manager.log_buffer_lsn = log_record.Lsn
if log_record.Lsn != -1 {
log_manager.log_buffer_lsn = log_record.Lsn
}

pos := log_manager.offset + HEADER_SIZE
log_manager.offset += log_record.Size

Expand Down Expand Up @@ -166,17 +169,19 @@ func (log_manager *LogManager) AppendLogRecord(log_record *LogRecord) types.LSN
copy(log_manager.log_buffer[pos:], prevPageIdInBytes)
pos += uint32(unsafe.Sizeof(log_record.Prev_page_id))
buf2 := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, log_record.Page_id)
binary.Write(buf2, binary.LittleEndian, log_record.Page_id)
pageIdInBytes := buf2.Bytes()
copy(log_manager.log_buffer[pos:], pageIdInBytes)
} else if log_record.Log_record_type == DEALLOCATE_PAGE {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, log_record.Deallocate_page_id)
//pos += uint32(unsafe.Sizeof(log_record.Deallocate_page_id))
pageIdInBytes := buf.Bytes()
copy(log_manager.log_buffer[pos:], pageIdInBytes)
} else if log_record.Log_record_type == REUSE_PAGE {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, log_record.Reuse_page_id)
//pos += uint32(unsafe.Sizeof(log_record.Reuse_page_id))
pageIdInBytes := buf.Bytes()
copy(log_manager.log_buffer[pos:], pageIdInBytes)
} else if log_record.Log_record_type == GRACEFUL_SHUTDOWN {
Expand Down
5 changes: 4 additions & 1 deletion lib/recovery/log_recovery/log_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,17 @@ func (log_recovery *LogRecovery) DeserializeLogRecord(data []byte, log_record *r
log_record.Old_tuple.DeserializeFrom(data[pos:])
pos += log_record.Old_tuple.Size() + uint32(tuple.TupleSizeOffsetInLogrecord)
log_record.New_tuple.DeserializeFrom(data[pos:])
pos += log_record.New_tuple.Size() + uint32(tuple.TupleSizeOffsetInLogrecord)
} else if log_record.Log_record_type == recovery.NEW_TABLE_PAGE {
binary.Read(bytes.NewBuffer(data[pos:]), binary.LittleEndian, &log_record.Prev_page_id)
pos += uint32(unsafe.Sizeof(log_record.Prev_page_id))
binary.Read(bytes.NewBuffer(data[pos:]), binary.LittleEndian, &log_record.Page_id)
} else if log_record.Log_record_type == recovery.DEALLOCATE_PAGE {
binary.Read(bytes.NewBuffer(data[pos:]), binary.LittleEndian, &log_record.Deallocate_page_id)
pos += uint32(unsafe.Sizeof(log_record.Deallocate_page_id))
} else if log_record.Log_record_type == recovery.REUSE_PAGE {
binary.Read(bytes.NewBuffer(data[pos:]), binary.LittleEndian, &log_record.Reuse_page_id)
pos += uint32(unsafe.Sizeof(log_record.Reuse_page_id))
} else if log_record.Log_record_type == recovery.GRACEFUL_SHUTDOWN {
// do nothing
}
Expand Down Expand Up @@ -180,7 +183,7 @@ func (log_recovery *LogRecovery) Redo(txn *access.Transaction) (types.LSN, bool,
new_page := access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Page_id))
page_id = new_page.GetPageId()
// fmt.Printf("page_id: %d\n", page_id)
new_page.Init(page_id, log_record.Prev_page_id, log_recovery.log_manager, nil, txn)
new_page.Init(page_id, log_record.Prev_page_id, log_recovery.log_manager, nil, txn, true)
//log_recovery.buffer_pool_manager.FlushPage(page_id)
log_recovery.buffer_pool_manager.UnpinPage(page_id, true)
} else if log_record.Log_record_type == recovery.DEALLOCATE_PAGE {
Expand Down
20 changes: 15 additions & 5 deletions lib/recovery/recovery_test/log_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,22 @@ func TestUndo(t *testing.T) {

fmt.Println("Check if updated tuple values are effected before recovery")
var old_tuple2 *tuple.Tuple
old_tuple2, _ = test_table.GetTuple(rid2, txn)
fmt.Println("old_tuple2: ", old_tuple2.Data())

testingpkg.Assert(t, old_tuple2 != nil, "")
testingpkg.Assert(t, old_tuple2.GetValue(schema_, 0).CompareEquals(types.NewVarchar("updated")), "")
testingpkg.Assert(t, old_tuple2.GetValue(schema_, 1).CompareEquals(types.NewInteger(256)), "")
if new_rid == nil {
old_tuple2, _ = test_table.GetTuple(rid2, txn)
fmt.Println("old_tuple2: ", old_tuple2.Data())

testingpkg.Assert(t, old_tuple2 != nil, "")
testingpkg.Assert(t, old_tuple2.GetValue(schema_, 0).CompareEquals(types.NewVarchar("updated")), "")
testingpkg.Assert(t, old_tuple2.GetValue(schema_, 1).CompareEquals(types.NewInteger(256)), "")
} else {
old_tuple2, _ = test_table.GetTuple(new_rid, txn)
fmt.Println("old_tuple2: ", old_tuple2.Data())

testingpkg.Assert(t, old_tuple2 != nil, "")
testingpkg.Assert(t, old_tuple2.GetValue(schema_, 0).CompareEquals(types.NewVarchar("updated")), "")
testingpkg.Assert(t, old_tuple2.GetValue(schema_, 1).CompareEquals(types.NewInteger(256)), "")
}

fmt.Println("Check if inserted tuple exists before recovery")
old_tuple3, _ := test_table.GetTuple(rid3, txn)
Expand Down
3 changes: 3 additions & 0 deletions lib/samehada/samehada_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func (si *SamehadaInstance) Shutdown(shutdownPat ShutdownPattern) {
case ShutdownPatternCloseFiles:
si.log_manager.Flush()
si.bpm.FlushAllDirtyPages()
logRecord := recovery.NewLogRecordGracefulShutdown()
si.log_manager.AppendLogRecord(logRecord)
si.log_manager.Flush()
// close only
si.disk_manager.ShutDown()
default:
Expand Down
4 changes: 2 additions & 2 deletions lib/storage/access/table_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewTableHeap(bpm *buffer.BufferPoolManager, log_manager *recovery.LogManage
log_manager.Flush()
}
firstPage.AddWLatchRecord(int32(txn.txn_id))
firstPage.Init(p.GetPageId(), types.InvalidPageID, log_manager, lock_manager, txn)
firstPage.Init(p.GetPageId(), types.InvalidPageID, log_manager, lock_manager, txn, false)
// flush page for recovery process works...
bpm.FlushPage(p.GetPageId())
bpm.UnpinPage(p.GetPageId(), true)
Expand Down Expand Up @@ -118,7 +118,7 @@ func (t *TableHeap) InsertTuple(tuple_ *tuple.Tuple, txn *Transaction, oid uint3
newPage.AddWLatchRecord(int32(txn.txn_id))
currentPage.SetNextPageId(p.GetPageId())
currentPageId := currentPage.GetPageId()
newPage.Init(p.GetPageId(), currentPageId, t.log_manager, t.lock_manager, txn)
newPage.Init(p.GetPageId(), currentPageId, t.log_manager, t.lock_manager, txn, false)
t.bpm.UnpinPage(currentPage.GetPageId(), true)
if common.EnableDebug && common.ActiveLogKindSetting&common.PIN_COUNT_ASSERT > 0 {
common.SH_Assert(currentPage.PinCount() == 0, "PinCount is not zero when finish TablePage::UpdateTuple!!!")
Expand Down
13 changes: 8 additions & 5 deletions lib/storage/access/table_page.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (tp *TablePage) RollbackDelete(rid *page.RID, txn *Transaction, log_manager
}

// Init initializes the table header
func (tp *TablePage) Init(pageId types.PageID, prevPageId types.PageID, log_manager *recovery.LogManager, lock_manager *LockManager, txn *Transaction) {
func (tp *TablePage) Init(pageId types.PageID, prevPageId types.PageID, log_manager *recovery.LogManager, lock_manager *LockManager, txn *Transaction, isForRedo bool) {
if common.EnableDebug {
defer func() {
if common.ActiveLogKindSetting&common.DEBUGGING > 0 {
Expand All @@ -466,9 +466,11 @@ func (tp *TablePage) Init(pageId types.PageID, prevPageId types.PageID, log_mana
}
tp.SetSerializedPageId(pageId)
tp.SetPrevPageId(prevPageId)
tp.SetNextPageId(types.InvalidPageID)
tp.SetTupleCount(0)
tp.SetFreeSpacePointer(common.PageSize) // point to the end of the page
if !isForRedo {
tp.SetNextPageId(types.InvalidPageID)
tp.SetTupleCount(0)
tp.SetFreeSpacePointer(common.PageSize) // point to the end of the page
}
}

// set value to Page::data memory area. not to Page::id
Expand Down Expand Up @@ -510,7 +512,8 @@ func (tp *TablePage) GetNextPageId() types.PageID {
}

func (tp *TablePage) GetTupleCount() uint32 {
return uint32(types.NewUInt32FromBytes(tp.Data()[offSetTupleCount:]))
ret := uint32(types.NewUInt32FromBytes(tp.Data()[offSetTupleCount:]))
return ret
}

func (tp *TablePage) GetTupleOffsetAtSlot(slot_num uint32) uint32 {
Expand Down

0 comments on commit ded563f

Please sign in to comment.