-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuffer_queue.c
100 lines (88 loc) · 3.11 KB
/
buffer_queue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
* Copyright (C) 2024 Mikhail Burakov. This file is part of streamer.
*
* streamer is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* streamer is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with streamer. If not, see <https://www.gnu.org/licenses/>.
*/
#include "buffer_queue.h"
#include <stdlib.h>
#include <string.h>
#include <threads.h>
struct BufferQueue {
mtx_t mutex;
struct BufferQueueItem** items;
size_t size;
size_t alloc;
};
struct BufferQueueItem* BufferQueueItemCreate(const void* data, size_t size) {
struct BufferQueueItem* buffer_queue_item =
malloc(sizeof(struct BufferQueueItem) + size);
if (!buffer_queue_item) return NULL;
buffer_queue_item->size = size;
memcpy(buffer_queue_item->data, data, size);
return buffer_queue_item;
}
void BufferQueueItemDestroy(struct BufferQueueItem* buffer_queue_item) {
free(buffer_queue_item);
}
struct BufferQueue* BufferQueueCreate(void) {
struct BufferQueue* buffer_queue = calloc(1, sizeof(struct BufferQueue));
if (!buffer_queue) return false;
if (mtx_init(&buffer_queue->mutex, mtx_plain) != thrd_success)
goto rollback_buffer_queue;
return buffer_queue;
rollback_buffer_queue:
free(buffer_queue);
return NULL;
}
bool BufferQueueQueue(struct BufferQueue* buffer_queue,
struct BufferQueueItem* buffer_queue_item) {
if (!buffer_queue_item || mtx_lock(&buffer_queue->mutex) != thrd_success)
return false;
if (buffer_queue->size == buffer_queue->alloc) {
size_t alloc = buffer_queue->alloc + 1;
struct BufferQueueItem** items =
realloc(buffer_queue->items, sizeof(struct BufferQueueItem*) * alloc);
if (!items) {
mtx_unlock(&buffer_queue->mutex);
return false;
}
buffer_queue->items = items;
buffer_queue->alloc = alloc;
}
buffer_queue->items[buffer_queue->size] = buffer_queue_item;
buffer_queue->size++;
mtx_unlock(&buffer_queue->mutex);
return true;
}
bool BufferQueueDequeue(struct BufferQueue* buffer_queue,
struct BufferQueueItem** buffer_queue_item) {
if (mtx_lock(&buffer_queue->mutex) != thrd_success) return false;
if (!buffer_queue->size) {
*buffer_queue_item = NULL;
} else {
buffer_queue->size--;
*buffer_queue_item = buffer_queue->items[0];
memmove(buffer_queue->items, buffer_queue->items + 1,
sizeof(struct BufferQueueItem*) * buffer_queue->size);
}
mtx_unlock(&buffer_queue->mutex);
return true;
}
void BufferQueueDestroy(struct BufferQueue* buffer_queue) {
for (size_t i = 0; i < buffer_queue->size; i++)
BufferQueueItemDestroy(buffer_queue->items[i]);
free(buffer_queue->items);
mtx_destroy(&buffer_queue->mutex);
free(buffer_queue);
}