Skip to content

project5 milestone1 : Concurrency Control Lock Manager

Sonny edited this page Feb 25, 2022 · 1 revision

Changed function

For the internal implementation of the function in lock acquire(),
return type was changed from lock_t* => int
the parameter was also added record_index

The locking mode, which is the return type of lock acquire(), is divided into two cases.

// RESULT OF LOCK ACQUIRE
#define IS_DEADLOCK 1024
#define SUCCESS 1025

The case of returning each type is as follows.
IS_DEADLOCK : When the lock request makes deadlock, abort that transaction and return this.
SUCCESS : successfully acquire the lock OR free pass situation.

4 type of lists in implementation.

  1. Bucket list : In lock table, each bucket is linked.
  2. Bucket lock list : Each bucket has lock list in boundary of page.
  3. record lock list : Each record has lock list for record lock request.
  4. trx lock list : Each transaction has its own lock objects.

Lock Mode & Lock state

When multiple transactions access the same record, first lock the record and access is made according to the state of the lock.

At this time, the lock mode is divided to allow multiple transactions to access, not just one transaction at a time.

There are two types of lock mode, and using this type of lock, we use them as one of the criteria for determining whether the two lock objects are conflict or not.

One is SHARED(S) and the other is EXCLUSIVE(X).

  • In the record lock list, the lock state of my lock object is determined according to the lock mode of the lock objects in front of me.

If record A : S(T1) - S(T2), the two rocks are not conform, so the two rocks are all in an integrated state.

If record A : S(T1) - X(T2), the two rocks are conflcit, so only the front Slock is in the interrogated state, and the following X Rock is in the waiting state.

If record A : X(T1) - S(T2), then Durak is conform, so only the front X lock is in the accumulated state, and the back S lock is in the waiting state.

If record A : X(T1) - X(T2), then Durak is conform, so only the front X lock is in the accumulated state, and the back S lock is in the waiting state.

In other words, if the type of one lock object is EXCLUSIVE, conflict occurs.

  • But I can't decide the state of the rock right away by looking at the type of rock right in front of me. An exception occurs if the lock in front of me is the lock of my transaction.

For example record A : X(T1) - S(T2) <== S(T2)

In a situation where S(T2) is added to the record lock list as in the above situation, the following S lock is a contrast to the first X(T1), so it must be in a waiting state.
However, if you compare only with S(T2) in front of you, you will be in a required state because it is not conflict.

Therefore, in order to determine the lock state of the new lock object in the record lock list, it is necessary to check whether there is a conflict lock object with new lock object in the current record lock list.

This was implemented using the functions below

pair<int, int> is_conflict_lock_in_record_lock_list(lock_t* pred_same_rid_lock_obj, int64_t table_id, pagenum_t page_id, int trx_id, int64_t record_id, int lock_mode, int record_index)
{

	// Case : There is conflict lock in record lock list
	while (pred_same_rid_lock_obj != NULL)
	{
		if (is_conflict_2(pred_same_rid_lock_obj, trx_id, record_id, lock_mode)) 
		{
			buf_page_latch_release(table_id, page_id);
			pthread_mutex_unlock(&trx_manager_latch);
			return make_pair(IS_CONFLICT_LOCK, -1);
		}
		pred_same_rid_lock_obj = pred_same_rid_lock_obj->same_rid_prev;
	}
	

	// There is not conflict lock
	buf_page_latch_release(table_id, page_id);
	pthread_mutex_unlock(&trx_manager_latch);
	return make_pair(NO_CONFLICT, -1);
}
bool is_conflict_1(lock_t* pred_lock_obj, lock_t* succ_lock_obj)
{
	if (pred_lock_obj == NULL || succ_lock_obj == NULL) return false;
	if (pred_lock_obj->owner_trx_id == succ_lock_obj->owner_trx_id) return false;
	if (pred_lock_obj->record_id != succ_lock_obj->record_id) return false;
	if (pred_lock_obj->lock_mode == SHARED && succ_lock_obj->lock_mode == SHARED) return false;

	return true;
}
bool is_conflict_2(lock_t* pred_lock_obj, int trx_id, int64_t record_id, int lock_mode)
{
	if (pred_lock_obj == NULL) return false;
	if (pred_lock_obj->owner_trx_id == trx_id) return false;
	if (pred_lock_obj->record_id != record_id) return false;
	if (pred_lock_obj->lock_mode == SHARED && lock_mode == SHARED) return false;

	return true;
}

So if there is no conflict locks in the record lock list, the lock state of my lock object becomes acquired and the record is accessed to proceed with the operation.

if (result_conflict_check.first == NO_CONFLICT)
{
    lock_t* new_lock_obj = make_new_lock_object(bucket, key, trx_id, lock_mode);

    // Append the new lock object behind of the record lock list. (update record lock list)		
    if (pred_same_rid_lock_obj != NULL)
    {
        pred_same_rid_lock_obj->same_rid_next = new_lock_obj;
	new_lock_obj->same_rid_prev = pred_same_rid_lock_obj;
    }
				
   // Case : There's no lock object in bucket lock list
   if (bucket->head == NULL)
   {
	bucket->head = new_lock_obj;
	bucket->tail = new_lock_obj;	
    }
    // Case : There are lock objects in front of the new lock object.
    else 
    {
	bucket->tail->next = new_lock_obj;
	new_lock_obj->prev = bucket->tail;
	bucket->tail = new_lock_obj;
	new_lock_obj->next = NULL;
    }	
    new_lock_obj->lock_state = ACQUIRED;			 
    pthread_mutex_unlock(&lock_manager_latch);
    return SUCCESS;
}

If a conflict lock exists, lock status become Waiting, and transaction status become Sleeping. Then check there is deadlock or not. Details of deadlock will be covered in the next section.

else if (result_conflict_check.first == IS_CONFLICT_LOCK 
{
	lock_t* new_lock_obj = make_new_lock_object(bucket, key, trx_id, lock_mode);	

	// Case : There are lock objects in front of the new lock object.
	bucket->tail->next = new_lock_obj;
	new_lock_obj->prev = bucket->tail;
	bucket->tail = new_lock_obj;
	new_lock_obj->next = NULL;
			
	// Append the new lock object behind of the record lock list. (update record lock list)		
	if (pred_same_rid_lock_obj != NULL)
	{
		pred_same_rid_lock_obj->same_rid_next = new_lock_obj;
		new_lock_obj->same_rid_prev = pred_same_rid_lock_obj;
	}

	new_lock_obj->lock_state = WAITING;
	trx_table[trx_id].trx_state = SLEEPING;
				
	pthread_mutex_lock(&trx_manager_latch);
	if (check_deadlock(new_lock_obj, record_index) == true)
	{
		trx_abort(trx_id);
		pthread_mutex_unlock(&trx_manager_latch);
		pthread_mutex_unlock(&lock_manager_latch);

		return IS_DEADLOCK;
	}
			
	pthread_mutex_unlock(&trx_manager_latch);
	pthread_cond_wait(&(new_lock_obj->cond), &lock_manager_latch);

	new_lock_obj->lock_state = ACQUIRED;
	trx_table[trx_id].trx_state = WORKING;

	pthread_mutex_unlock(&lock_manager_latch);
	return SUCCESS;
}
  • However, there are other things to consider besides the above situation.

Make sure that the transaction that made the lock request is already accepting the same or stronger lock type in that record before performing locking. ( X is stronger than S )

bool is_my_trx_already_acquired_same_or_stronger_lock_type(lock_Bucket* bucket, int64_t table_id, pagenum_t page_id, int64_t key, int trx_id, int lock_mode, int record_index)
{
	// Case : A transaction has already same or stronger lock in record lock list.
	lock_t* temp_lock_obj = bucket->head;

	// First, find the lock object with same record id (to seek record lock list)
	while (temp_lock_obj != NULL)
	{
		if (temp_lock_obj->record_id == key) break;
		temp_lock_obj = temp_lock_obj->next;

	}
	if (temp_lock_obj == NULL) return false;
	
	// Second, find whether my trx's already acquired same or stronger lock type or not.
	while (temp_lock_obj != NULL)
	{
		if (temp_lock_obj->owner_trx_id == trx_id && (temp_lock_obj->lock_mode >= lock_mode) && temp_lock_obj->lock_state == ACQUIRED)
			return true;
		
		temp_lock_obj = temp_lock_obj->same_rid_next;
	}
	if (temp_lock_obj == NULL) return false;
}

To this end if there is same or stronger my lock type, the lock acquire() ends without adding a lock to the record lock list. However, if it does not exist, add an explicit lock to the record lock list.

Deadlock Detection

If the lock is conflicted, becomes a waiting state, and the owner transaction of the lock is in a sleeping state, the deadlock is inspected.
Deadlock means a situation where the transaction in front of me that I am waiting for is waiting for me, and eventually I am waiting for me.
In my implementation, the functions used to find deadlocks are check_deadlock() and is_cycle().

  • check_deadlock()
bool check_deadlock(lock_t* new_lock_obj)
{
	std::map<int, trx_t>::iterator iter;
	for (iter = trx_table.begin(); iter != trx_table.end(); iter++)
		iter->second.visited = false;

	lock_t* pred_same_rid_lock_obj = new_lock_obj;
	while (pred_same_rid_lock_obj != NULL)
	{
		if (is_conflict(pred_same_rid_lock_obj, new_lock_obj) && trx_table[pred_same_rid_lock_obj->owner_trx_id].trx_state == SLEEPING )
		{
			if (is_cycle(pred_same_rid_lock_obj->owner_trx_id, new_lock_obj->owner_trx_id)) return true;
		}
		pred_same_rid_lock_obj = pred_same_rid_lock_obj->same_rid_prev;
	}
	return false;
}

The check_deadlock() checks that whether there is cycle which means deadlock.
To detect this, it calls is_cycle() for all conflicting and sleeping transactions of that conflicting locks in front of me in record lock list.
If there is a cycle, return true because it found deadlock.

  • is_cycle()
bool is_cycle(int suspicious_trx_id, int new_trx_id)
{
        if (trx_table[suspicious_trx_id].visited) return false;
	
	trx_table[suspicious_trx_id].visited = true;
	
	bool is_found = false;
	std::queue<int> q;
	q.push(suspicious_trx_id);
	while (!q.empty())
	{
		int qSize = q.size();
		for (int i = 0; i < qSize; i++)
		{
			int curr_trx_id = q.front();
			q.pop();
			trx_t& curr_trx_obj = trx_table[curr_trx_id];
			
			// Iterate trx_lock_list. (downward)
			lock_t* trx_lock_obj = curr_trx_obj.head_trx_lock_list;
			while (trx_lock_obj != NULL)
			{
				if (trx_lock_obj->lock_state == WAITING)
				{
					lock_t* prev_same_rid_lock_obj = trx_lock_obj->same_rid_prev;
					
					// Iterate record lock list (leftward)
					while (prev_same_rid_lock_obj != NULL)
					{
						// check waits for what
						if (is_conflict(prev_same_rid_lock_obj, trx_lock_obj) && trx_table[prev_same_rid_lock_obj->owner_trx_id].trx_state == SLEEPING)
						{

							// This means that there is cycle.
							if (prev_same_rid_lock_obj->owner_trx_id == new_trx_id) return true;
							
							trx_t& temp_trx_obj = trx_table[prev_same_rid_lock_obj->owner_trx_id];
							if (temp_trx_obj.visited != true)
							{
								q.push(prev_same_rid_lock_obj->owner_trx_id);
								temp_trx_obj.visited = true;
							}
						}
						prev_same_rid_lock_obj = prev_same_rid_lock_obj->same_rid_prev;
					}
				}
				trx_lock_obj = trx_lock_obj->trx_next_lock;
			}
		}
	}
	return false;
}

is_cycle() checks the trx_lock_list in the trx_obj which trx_id is suspicious_trx_id.
If there is a lock that waiting trx which trx_id is new_trx_id, then it means there is a cycle.
This algorithm is based on BFS.

So, after check_deadlock(), if it returns true, then the lock_acquire() call trx_abort(). I'll cover this the next section. Otherwise, the test return false, then the new lock object should wait until the front conflicting lock released and wakes it up.

new_lock_obj->lock_state = WAITING;
trx_table[trx_id].trx_state = SLEEPING;
				
pthread_mutex_lock(&trx_manager_latch);
if (check_deadlock(new_lock_obj, record_index) == true)
{
	trx_abort(trx_id);
	pthread_mutex_unlock(&trx_manager_latch);
	pthread_mutex_unlock(&lock_manager_latch);

	return IS_DEADLOCK;
}
			
pthread_mutex_unlock(&trx_manager_latch);
pthread_cond_wait(&(new_lock_obj->cond), &lock_manager_latch);

new_lock_obj->lock_state = ACQUIRED;
trx_table[trx_id].trx_state = WORKING;

pthread_mutex_unlock(&lock_manager_latch);
return SUCCESS;

Abort and Rollback

If a deadlock is detected, lock_acquire() calls trx_abort() to proceed with abort and rollback.

int trx_abort(int trx_id)
{
	trx_t& trx_obj = trx_table[trx_id];
	
	// 1. Undo all modified records by the transaction
	trx_roll_back_record(trx_obj.old_logs);
	lock_t* head = trx_obj.head_trx_lock_list;

	
	// 2. Release all acquired lock object
	lock_t* mov_temp = head;
	lock_t* del_temp = NULL;

	while (mov_temp != NULL)
	{
		del_temp = mov_temp;
		mov_temp = del_temp->trx_next_lock;
		if (lock_release(del_temp) != 0) return 0;
	}

	// 3. Remove the transaction table entry \n";
	trx_table.erase(trx_id);

	return 1;
}

This function does three things.

  • Undo all modified records by the transaction To this end, transactions that call db_update() call trx_write_log() every time a write operation is performed to save the changed records.
void trx_write_log(int64_t table_id, pagenum_t page_id, int64_t key, char* old_val, uint64_t old_val_size, int trx_id)
{
	old_log_t* old_log_obj = (old_log_t*)malloc(sizeof(old_log_t));

	old_log_obj->table_id = table_id;
	old_log_obj->page_id = page_id;
	old_log_obj->key = key;
	memcpy(old_log_obj->old_val, old_val, old_val_size);
	old_log_obj->old_val_size = old_val_size;

	trx_table[trx_id].old_logs.push(old_log_obj);
}

trx_roll_back_record() restores records to previous information through this old_logs

int trx_roll_back_record(std::stack<old_log_t*> old_logs)
{
	Leaf_page* leaf_page = (Leaf_page*)malloc(sizeof(Leaf_page));
	Slot slot;
	char delivery[PAGE_SIZE] = {};
	while (!old_logs.empty())
	{
		old_log_t* old_log = old_logs.top();
		old_logs.pop();

		int64_t table_id = old_log->table_id;
		pagenum_t page_id = old_log->page_id;
		int64_t key = old_log->key;


		buf_read_page(table_id, page_id, (page_t*)leaf_page); // page locked.			

		memcpy(delivery, leaf_page, PAGE_SIZE);
		
		long low = 0, high = leaf_page->numOfkeys-1, mid = 0;
		while (low <= high)
    	        {
        	        mid = (low + high) / 2;
		        memcpy(&slot, delivery+(SLOT_STARTING_POINT+SLOT_SIZE*mid), SLOT_SIZE);
        	        if ( slot.key == key ) break;
        	        else if ( slot.key > key ) high = mid-1;
        	        else low = mid+1;
    	        }
		if (low > high)
		{
			buf_page_latch_release(table_id, page_id);
			return -1;
		}
			
		memcpy(delivery+slot.offset, old_log->old_val, old_log->old_val_size);
		buf_write_page(table_id, page_id, (page_t*)delivery); // page unlock
		free(old_log);
	}
	free(leaf_page);
	return 0;
}
  • Release all acquired lock object Remove all lock objects belonging to the trx_lock_list that the transaction has.
lock_t* mov_temp = head;
lock_t* del_temp = NULL;

while (mov_temp != NULL)
{
	del_temp = mov_temp;
	mov_temp = del_temp->trx_next_lock;
	if (lock_release(del_temp) != 0) return 0;
}
  • Remove the transaction table entry Remove transactions corresponding to trx_id from the trx_table.
trx_table.erase(trx_id);

lock_release()

In trx_commit () or trx_abort(), they call lock_release to erase its lock object and wake up the successor lock object.

If there is only one lock object in bucket lock list OR there are other acquried lock object in record lock list, then just free my lock object and update the list(bucket lock list / record lock list)
Otherwise, if there are other lock object in record lock list and my lock object is only acquired lock, then wake up the successor lock object(transaction). And recursively lock object that woke up wakes up the successor lock object(transaction) only if they are not conflict. Then free my lock object and update the lists.

int lock_release(lock_t* lock_obj)  
{
	lock_Bucket* bucket = lock_obj->p_sentinel;
	
	// Case : There's only one lock object in lock list in bucket, then free the bucket.
	if (bucket->head->next == NULL)
	{
		bucket->head = NULL;
		bucket->tail = NULL;
		free(lock_obj);
		return 0;
	}
	else
	{
		if (!is_acquired_lock_in_record_lock_list(lock_obj))
		{
			lock_t* next_same_rid_lock_obj = lock_obj->same_rid_next;
			
			while (next_same_rid_lock_obj != NULL)
			{
				pthread_cond_signal(&next_same_rid_lock_obj->cond);

				if (!is_conflict_1(next_same_rid_lock_obj, next_same_rid_lock_obj->same_rid_next))
					next_same_rid_lock_obj = next_same_rid_lock_obj->same_rid_next;
				else break;
			}
		}
	}

	update_bucket_lock_list_after_release(bucket, lock_obj);
	update_record_lock_list_after_release(lock_obj);
	free(lock_obj);

	return 0;
}

Code Flow Summary

  1. db_find / db_update -> find leaf page.
  2. call lock_acquire()
  3. After acquired record lock, then re-acquire page latch and read/write operation and release page latch.

In the lock acquire()

  1. There is no bucket
    1.1 make bucket & make acquired S/X lock object
  2. There is a bucket
    2.1 if my transaction already acquired same or stronger lock type
    => Free pass
    2.2 Check whether there is conflict lock.
    2.2-1 NO Conflict => make acquired S/X lock object
    2.2-2 Is conflict => Check deadlock => abort OR waiting