-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcompress_queue.c
More file actions
123 lines (95 loc) · 2.45 KB
/
compress_queue.c
File metadata and controls
123 lines (95 loc) · 2.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#include "compress_queue.h"
#include <malloc.h>
void *worker(void *data)
{
compress_queue_t *queue = (compress_queue_t *)data;
compress_job_t *job;
uLongf tmp;
pthread_mutex_lock(&queue->mutex);
for (;;)
{
while (!queue->next_job)
pthread_cond_wait(&queue->cond, &queue->mutex);
job = queue->next_job;
queue->next_job = job->next;
if (!queue->next_job)
queue->last_job = NULL;
pthread_mutex_unlock(&queue->mutex);
tmp = job->dst_len;
compress2(job->dst, &tmp, job->src, job->src_len, 9);
job->dst_len = tmp;
job->status = COMPRESS_JOB_DONE;
pthread_mutex_lock(&queue->mutex);
job->next = queue->done_job;
queue->done_job = job;
}
}
int compress_queue_init(compress_queue_t *queue, int threads)
{
queue->next_job = NULL;
queue->last_job = NULL;
queue->done_job = NULL;
pthread_mutex_init(&queue->mutex, NULL);
pthread_cond_init(&queue->cond, NULL);
int i;
pthread_t thread;
for (i=0; i<threads; ++i)
{
pthread_create(&thread, NULL, worker, queue);
}
return 0;
}
int compress_queue_add(compress_queue_t *queue, compress_job_t *job)
{
pthread_mutex_lock(&queue->mutex);
if (!queue->last_job)
{
if (!queue->next_job)
queue->next_job = job;
}
else
queue->last_job->next = job;
queue->last_job = job;
job->status = COMPRESS_JOB_PENDING;
pthread_cond_signal(&queue->cond);
pthread_mutex_unlock(&queue->mutex);
return 0;
}
compress_job_t *compress_queue_pop(compress_queue_t *queue)
{
compress_job_t *job;
if (!queue->done_job)
return NULL;
pthread_mutex_lock(&queue->mutex);
if (!queue->done_job)
{
pthread_mutex_unlock(&queue->mutex);
return NULL;
}
job = queue->done_job;
queue->done_job = job->next;
pthread_mutex_unlock(&queue->mutex);
return job;
}
int compress_job_init(compress_job_t *job, void *src, int src_len)
{
job->status = COMPRESS_JOB_INIT;
job->src = src;
job->src_len = src_len;
job->dst_len = compressBound(src_len);
if (!(job->dst = malloc(job->dst_len)))
return -1;
job->next = NULL;
return 0;
}
int compress_job_del(compress_job_t *job)
{
if (job->status == COMPRESS_JOB_PENDING)
return -1;
if (job->dst)
{
free(job->dst);
job->dst = NULL;
}
return 0;
}