Skip to content

Commit

Permalink
Leafs can now have separate sources.
Browse files Browse the repository at this point in the history
  • Loading branch information
Griezn committed Dec 16, 2024
1 parent 2f6fd3e commit 26adda3
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 104 deletions.
4 changes: 1 addition & 3 deletions include/file_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@
typedef struct FileSource {
source_t source;
int fd;
uint32_t index;
uint32_t inc;
} file_source_t;

source_t *create_file_source(const char *filename, uint8_t wsize, uint32_t wstep);
source_t *create_file_source(const char *filename);

sink_t *create_file_sink();

Expand Down
5 changes: 0 additions & 5 deletions include/generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@
#define GENERATOR_H
#include "source.h"

#include <stdbool.h>

typedef struct GeneratorSource {
source_t source;
bool has_next;
} generator_source_t;

source_t *create_generator_source();
Expand All @@ -20,8 +17,6 @@ void free_generator_source(source_t *source);

void free_generator_sink(sink_t *sink);

#define GENERATOR_SIZE 10

enum subject {
SUBJECT_ALICE,
SUBJECT_BOB,
Expand Down
12 changes: 12 additions & 0 deletions include/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,16 @@ typedef struct {

void execute_query(const query_t *query, const source_t *source, sink_t *sink);

void join_triple_copy(const data_t *src1, uint32_t index1,
const data_t *src2, uint32_t index2, data_t *dest);

bool join_check(const data_t *src1, uint32_t index1,
const data_t *src2, uint32_t index2, join_params_t check);

void triple_copy(const data_t *src, uint32_t index, data_t *dest);

bool filter_check(const data_t *src, uint32_t index, filter_params_t check);

bool select_check(const data_t *src, uint32_t index, select_params_t param);

#endif //QUERY_H
5 changes: 4 additions & 1 deletion include/source.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
#define SOURCE_H
#include "data.h"

#include <stdbool.h>

// Created a source to enable other sources than generator e.g. network
typedef struct Source {
data_t buffer;
data_t* (*get_next)(const struct Source *self);
uint8_t index;
uint8_t calls;
data_t* (*get_next)(const struct Source *self, const uint8_t size, const uint8_t step, const uint8_t calls);
} source_t;

typedef struct Sink {
Expand Down
21 changes: 11 additions & 10 deletions src/file_source.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,29 @@
#include "utils.h"


data_t *get_next_file(const source_t *source)
data_t *get_next_file(const source_t *source, const uint8_t size, const uint8_t step, const uint8_t calls)
{
file_source_t *fs = (file_source_t*) source;
if (fs->index > fs->source.buffer.size) {
if (fs->source.index + step > fs->source.buffer.size) {
return NULL;
}

data_t *data = malloc(sizeof(data_t));
assert(data);
data->data = fs->source.buffer.data + (fs->index * fs->source.buffer.width);
data->size = min(fs->inc, fs->source.buffer.size - fs->index);
data->data = fs->source.buffer.data + (fs->source.index * fs->source.buffer.width);
data->size = min(size, fs->source.buffer.size - fs->source.index);
data->width = source->buffer.width;

fs->index += fs->inc;
if (++fs->source.calls == calls) {
fs->source.index += step;
fs->source.calls = 0;
}

return data;
}


source_t *create_file_source(const char *filename, uint8_t wsize, uint32_t wstep)
source_t *create_file_source(const char *filename)
{
file_source_t *fs = malloc(sizeof(file_source_t));

Expand All @@ -60,10 +63,8 @@ source_t *create_file_source(const char *filename, uint8_t wsize, uint32_t wstep
fs->source.buffer.size = sb.st_size / sizeof(triple_t);
fs->source.buffer.width = 1;
fs->source.get_next = get_next_file;
fs->index = 0;
fs->inc = wstep;

(void) wsize;
fs->source.index = 0;
fs->source.calls = 0;

return (source_t*) fs;
}
Expand Down
15 changes: 10 additions & 5 deletions src/generator.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,19 @@ triple_t triples[] = {

#define NUM_TRIPLES (sizeof(triples) / sizeof(triples[0]))

data_t* get_next_generator(const source_t *source) {
data_t* get_next_generator(const source_t *source, const uint8_t size, const uint8_t step, const uint8_t calls) {
generator_source_t* generator = (generator_source_t*) source;
if (!generator->has_next) {
if (generator->source.index + size > generator->source.buffer.size) {
return NULL;
}
generator->has_next = false;

data_t *data = malloc(sizeof(data_t));
*data = generator->source.buffer;;
*data = generator->source.buffer;

if (++generator->source.calls == calls) {
generator->source.index += step;
generator->source.calls = 0;
}

return data;
}
Expand All @@ -76,7 +80,8 @@ source_t *create_generator_source()
generator_source_t *source = malloc(sizeof(generator_source_t));
source->source.buffer = (data_t) {triples, NUM_TRIPLES, 1};
source->source.get_next = get_next_generator;
source->has_next = true;
source->source.index = 0;
source->source.calls = 0;

return (source_t*) source;
}
Expand Down
4 changes: 2 additions & 2 deletions src/multi_source.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <stdio.h>


data_t *get_next_multi_source(const source_t *source)
data_t *get_next_multi_source(const source_t *source, const uint8_t size, const uint8_t step, const uint8_t calls)
{
multi_source_t *ms = (multi_source_t*) source;
data_t* datas[ms->num_sources];
Expand All @@ -18,7 +18,7 @@ data_t *get_next_multi_source(const source_t *source)

// fetch data, set size
for (int i = 0; i < ms->num_sources; ++i) {
data_t *next = ms->sources[i]->get_next(ms->sources[i]);
data_t *next = ms->sources[i]->get_next(ms->sources[i], size, step, calls);

if (next == NULL)
return NULL;
Expand Down
81 changes: 41 additions & 40 deletions src/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
//
#include "query.h"
#include "data.h"
#include "utils.h"

#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>


Expand Down Expand Up @@ -56,20 +54,20 @@ void filter(const data_t *in, data_t *out, const filter_params_t param)


/// The window operator creates a copy of the input stream in a newly specified size
/// @param in The input stream
/// @param out A selection of the input stream
/// @param param The window parameter containing a size of the window
void window(const data_t *in, data_t *out, const uint32_t param)
/// @param params The window parameter
bool window(data_t *out, const window_params_t params)
{
const uint32_t size = min(in->size, param) * in->width;
out->data = malloc(size * sizeof(triple_t));
assert(out->data);
out->size = in->size;
out->width = in->width;
data_t* data = params.source->get_next(params.source, params.size, params.step, params.calls);

// TODO: FIX LATER
//in->size = in->size - size;
memcpy(out->data, in->data, size * sizeof(triple_t));
if (data == NULL) {
*params.quit = true;
return false;
}

*out = *data;
free(data);
return true;
}


Expand All @@ -95,19 +93,20 @@ void select_query(const data_t *in, data_t *out, const select_params_t param)
}


void execute_operator(const operator_t *operator_, const data_t *in, data_t *out);
bool execute_operator(const operator_t *operator_, const data_t *in, data_t *out);
void *execute_operator_thread(void *arg) {
const operator_thread_arg_t *targ = arg;
execute_operator(targ->operator_, targ->in, targ->out);
return NULL;
bool *return_value = malloc(sizeof(bool));
*return_value = execute_operator(targ->operator_, targ->in, targ->out);
return return_value;
}


/// This function executed the right operator
/// @param operator The operator to be executed
/// @param in The input stream
/// @param out The output stream
void execute_operator(const operator_t *operator, const data_t *in, data_t *out)
bool execute_operator(const operator_t *operator, const data_t *in, data_t *out)
{
data_t tmpo1 = *in;
data_t tmpo2 = {NULL, 0, 1};
Expand All @@ -117,20 +116,11 @@ void execute_operator(const operator_t *operator, const data_t *in, data_t *out)
assert(operator->left);
assert(operator->right);

// Thread arguments
operator_thread_arg_t left_arg = {operator->left, in, &tmpo1};
operator_thread_arg_t right_arg = {operator->right, in, &tmpo2};

// Threads
pthread_t left_thread, right_thread;
bool left_bool = execute_operator(operator->left, in, &tmpo1);
bool right_bool = execute_operator(operator->right, in, &tmpo2);

// Execute left and right operators in parallel
pthread_create(&left_thread, NULL, execute_operator_thread, &left_arg);
pthread_create(&right_thread, NULL, execute_operator_thread, &right_arg);

// Wait for threads to finish
pthread_join(left_thread, NULL);
pthread_join(right_thread, NULL);
if (!left_bool || !right_bool)
return false;

join(&tmpo1, &tmpo2, out, operator->params.join);

Expand All @@ -139,30 +129,43 @@ void execute_operator(const operator_t *operator, const data_t *in, data_t *out)
break;
case FILTER:
if (operator->left) {
execute_operator(operator->left, in, &tmpo1);
if(!execute_operator(operator->left, in, &tmpo1))
return false;
}

filter(&tmpo1, out, operator->params.filter);
break;
case WINDOW:
if (operator->left) {
execute_operator(operator->left, in, &tmpo1);
if(!execute_operator(operator->left, in, &tmpo1))
return false;
}

window(&tmpo1, out, operator->params.window);
if (!window(out, operator->params.window))
return false;

break;
case SELECT:
if (operator->left)
execute_operator(operator->left, in, &tmpo1);
if (operator->left) {
if(!execute_operator(operator->left, in, &tmpo1))
return false;
}

select_query(&tmpo1, out, operator->params.select);
break;
}


if (tmpo1.data != in->data) {
assert(operator->left);
if (operator->left->type == WINDOW)
return true;

free(tmpo1.data);
tmpo1.data = NULL;
}

return true;
}


Expand All @@ -173,12 +176,10 @@ void execute_operator(const operator_t *operator, const data_t *in, data_t *out)
void execute_query(const query_t *query, const source_t *source, sink_t *sink)
{
data_t data = {NULL, 0, 1};
data_t* next_data = NULL;

while ((next_data = source->get_next(source)) != NULL) {
execute_operator(query->root, next_data, &data);
(void)source;
while (!query->quit) {
execute_operator(query->root, &data, &data);
sink->push_next(sink, &data);
free(next_data);
next_data = NULL;
}
}
18 changes: 9 additions & 9 deletions tests/dataTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ TEST(DataTests, test_filter_check)
TEST(DataTests, test_file_source)
{
source_t *gsource = create_generator_source();
source_t *fsource = create_file_source("../../tests/triples.bin" ,1, 255);
source_t *fsource = create_file_source("../../tests/triples.bin");

sink_t *gsink = create_generator_sink();
sink_t *fsink = create_generator_sink();

data_t *next_gdata = gsource->get_next(gsource);
data_t *next_fdata = fsource->get_next(fsource);
data_t *next_gdata = gsource->get_next(gsource, 36, 36, 1);
data_t *next_fdata = fsource->get_next(fsource, 36, 36, 1);

gsink->push_next(gsink, next_gdata);
fsink->push_next(fsink, next_fdata);
Expand All @@ -165,7 +165,7 @@ TEST(DataTests, test_file_source)

free_generator_source(gsource);
free_file_source(fsource);
free(gsink); // not the normal because it is still the array allocated int he lib
free(gsink); // not the normal because it is still the array allocated in the lib
free(fsink); // not the normal because buffer is freed in source
}

Expand All @@ -174,27 +174,27 @@ TEST(DataTests, test_file_source_inc)
{
constexpr uint32_t increment = 12;
source_t *gsource = create_generator_source();
source_t *fsource = create_file_source("../../tests/triples.bin" ,1, increment);
source_t *fsource = create_file_source("../../tests/triples.bin");

sink_t *gsink = create_generator_sink();
sink_t *fsink = create_generator_sink();

data_t *next_gdata = gsource->get_next(gsource);
data_t *next_fdata = fsource->get_next(fsource);
data_t *next_gdata = gsource->get_next(gsource, increment, increment, 1);
data_t *next_fdata = fsource->get_next(fsource, increment, increment, 1);

gsink->push_next(gsink, next_gdata);
fsink->push_next(fsink, next_fdata);

ASSERT_TRUE(ARR_EQ(gsink->buffer.data, fsink->buffer.data, increment));

free(next_fdata);
next_fdata = fsource->get_next(fsource);
next_fdata = fsource->get_next(fsource, increment, increment, 1);
fsink->push_next(fsink, next_fdata);

ASSERT_TRUE(ARR_EQ(gsink->buffer.data + increment, fsink->buffer.data, increment));

free(next_fdata);
next_fdata = fsource->get_next(fsource);
next_fdata = fsource->get_next(fsource, increment, increment,1);
fsink->push_next(fsink, next_fdata);

ASSERT_TRUE(ARR_EQ(gsink->buffer.data + 2*increment, fsink->buffer.data, increment));
Expand Down
Loading

0 comments on commit 26adda3

Please sign in to comment.