-
Notifications
You must be signed in to change notification settings - Fork 1
/
thread_pool.c
184 lines (147 loc) · 5.25 KB
/
thread_pool.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#include "common_headers.h"
#include "definitions.h"
/*
scheduler* init_scheduler(char* policy, int buffer_size):
Initializes a schedular of type schedular*. Sets the data structure of
the schedular according to the scheduling policy
Returns the schedular
*/
scheduler* init_scheduler(char* policy, int buffer_size) {
scheduler* d = (scheduler*)malloc(sizeof(scheduler));
if(d == NULL) {
return NULL;
}
d->policy = policy;
d->buffer_size = buffer_size;
d->curr_size = 0;
// Decide the data structure based on scheduling policy
if (strcmp("SFF", policy) == 0) {
d->Heap = init_heap(buffer_size, 0);
d->Queue = NULL;
} else if (strcmp("FIFO", policy) == 0) {
d->Queue = init_queue(buffer_size);
d->Heap = NULL;
} else if(strcmp("SFNF", policy) == 0){
d->Heap = init_heap(buffer_size, 1);
d->Queue = NULL;
}
return d;
}
/*
thread_pool* init_thread_pool(int num_threads):
Creates a thread pool of size num_threads and initializes the locks of the thread pool
Returns the thread pool
*/
thread_pool* init_thread_pool(int num_threads) {
thread_pool* workers = (thread_pool*)malloc(sizeof(thread_pool));
if(workers == NULL) {
return NULL;
}
workers->num_threads = num_threads;
workers->working_threads = 0;
workers->pool = malloc(num_threads * sizeof(pthread_t));
Pthread_mutex_init(&workers->LOCK, NULL);
Pthread_cond_init(&workers->FILL, NULL);
Pthread_cond_init(&workers->EMPTY, NULL);
return workers;
}
/*
void start_threads(scheduler* d, thread_pool* workers):
This function creates worker threads and invoke the thread_worker function from all threads
*/
void start_threads(scheduler* d, thread_pool* workers) {
for (int i = 0; i < workers->num_threads; i++) {
thread_arg* arg = (thread_arg*)malloc(sizeof(thread_arg));
if(arg == NULL) {
printf("Thread number %d: Creation failed", i);
continue;
}
arg->workers = workers;
arg->scheduler = d;
arg->num_request = i;
Pthread_create(&workers->pool[i], NULL, thread_worker, arg);
}
}
/*
void schedule_new_request(scheduler* d, int conn_fd):
Depending on the policy
If the policy is SFF or SFNF, then get the corresponding file property from the request_file_properties(conn_fd),
and insert in the heap data structure, with the parameter returned from the request_file_properties.
If the policy is FIFO: Just insert the request in the queue.
*/
void schedule_new_request(scheduler* d, int conn_fd) {
if (strcmp(d->policy, "SFF") == 0 || strcmp("SFNF", d->policy) == 0) {
file_prop* fileProp = request_file_properties(conn_fd);
printf("File Size for %d : %ld and name %s\n", conn_fd,fileProp->file_size,fileProp->file_name);
insert_in_heap(conn_fd, fileProp->file_size,fileProp->file_name,d->Heap);
} else if (strcmp("FIFO", d->policy) == 0) {
insert_in_queue(conn_fd, d->Queue);
}
d->curr_size++;
}
/*
int pick_request(scheduler* d):
Depending on the policy, extracts conn file descriptor of the correct request
Returns the file descriptor
*/
int pick_request(scheduler* d) {
int conn_fd;
if (strcmp(d->policy, "SFF") == 0 || strcmp("SFNF", d->policy) == 0) {
conn_fd = extract_min(d->Heap);
} else if (strcmp("FIFO", d->policy) == 0) {
conn_fd = get_from_queue(d->Queue);
}
d->curr_size--;
return conn_fd;
}
/*
int is_scheduler_full(scheduler* d)
Returns 1 if the schedular is full
Else Returns 0
*/
int is_scheduler_full(scheduler* d) {
return d->curr_size == d->buffer_size;
}
/*
int is_scheduler_empty(scheduler* d)
Returns 1 if the schedular is empty
Else Returns 0
*/
int is_scheduler_empty(scheduler* d) {
return d->curr_size == 0;
}
/*
give_to_scheduler(thread_pool* workers, scheduler* d, int conn_fd):
It queries the scheduler to check if the queue or heap data stucture
is full or not.
If not full, it stores the current socket descriptor in the data
structure.
Else puts the caller thread to sleep.
*/
void give_to_scheduler(thread_pool* workers, scheduler* d, int conn_fd) {
Pthread_mutex_lock(&workers->LOCK);
while(is_scheduler_full(d)) {
Pthread_cond_wait(&workers->FILL, &workers->LOCK);
}
schedule_new_request(d, conn_fd);
Pthread_cond_signal(&workers->EMPTY);
Pthread_mutex_unlock(&workers->LOCK);
}
/*
get_from_scheduler(thread_pool* workers, scheduler* d):
It queries the scheduler to check if the queue or heap data stucture
is empty or not.
If not empty, it retrieves the required socket descriptor.
Else puts the caller thread to sleep.
*/
int get_from_scheduler(thread_pool* workers, scheduler* d) {
Pthread_mutex_lock(&workers->LOCK);
while(is_scheduler_empty(d)) {
Pthread_cond_wait(&workers->EMPTY, &workers->LOCK);
}
int conn_fd = pick_request(d);
printf("Request Scheduled for FD: %d\n", conn_fd);
Pthread_cond_signal(&workers->FILL);
Pthread_mutex_unlock(&workers->LOCK);
return conn_fd;
}