-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpef.cpp
2245 lines (1852 loc) · 76.4 KB
/
pef.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Copyright © 2019 Andrey Nevolin, https://github.com/AndreyNevolin
* Twitter: @Andrey_Nevolin
* LinkedIn: https://www.linkedin.com/in/andrey-nevolin-76387328
*
* Performance Evaluation Framework
*/
/* PEF includes */
#include "common.h"
#include "worker.h"
#include "time.h"
#include "config.h"
/* Standard C/C++ includes */
#include <unistd.h>
#include <errno.h>
/* Linux, POSIX, GNU includes */
#include <fcntl.h>
/* This "include" is required for "getrandom()" system call and - correspondingly - a
* version of "pef_GenerateInitialData()" that relies on this system call. Currently we
* don't use this version of "pef_GenerateInitialData()" by default because at the time
* of this writing most Linux installations were not fresh enough and didn't support
* "getrandom()".
* Uncomment this "include" and also the corresponding version of
* "pef_GenerateInitialData()" if you want to enable random data generation based on
* "getrandom()"
*
* #include <sys/random.h>
*/
/**
* Structure that represents a context-invariant argument (as opposed to context-specific
* arguments) for "pef_GetData()" function
*/
typedef struct
{
/* Buffer with pre-generated data */
char *data_buf;
} pef_GetDataArg_t;
/**
* Global - i.e. not related to particular tasks or workers - execution statistics
*/
typedef struct
{
/* Time when actual benchmarking was started */
pef_time_t benchmark_start_time;
/* Whether there were errors during benchmarking */
bool is_errors;
} pef_GlobalStat_t;
/**
* References to data structures pre-allocated for workers
*/
typedef struct
{
/* Array of references to structures with shared summary statistics */
pef_WorkerSharedSummary_t **shared_summary;
/* Array of references to structures with private summary statistics */
pef_WorkerPrivateSummary_t **private_summary;
/* Array of references to arrays of structures with per-task statistics. Amount of
tasks to do by each worker is known in advance. So, all the memory needed to keep
the statistics can be pre-allocated in advance */
pef_WorkerTaskStat_t **stat_tasks;
/* Array of references to locks that guard access to data structures that are not
private to workers. One lock per each worker */
pthread_mutex_t **locks;
} pef_WorkerData_t;
/**
* Generate initial data
*
* (We call this step "initial" generation of data because it happens before the worker
* threads start doing their job. In future "in-flight" generation of data or some other
* strategies may be added)
*
* Return value: a pointer to a buffer with generated data. The buffer must be deallocated
* after use by caller
*/
static int pef_GenerateInitialData( const pef_Opts_t* const opts,
char **buf_ret,
std::string indent,
char *err_msg,
int err_msg_size)
{
PEF_ASSERT( opts);
PEF_VERBOSE( opts, "%sGenerating %ld bytes of initial data", indent.c_str(),
opts->write_chunk_size);
int ret = 0;
char local_err_msg[PEF_ERR_MSG_SIZE];
const char dev_urandom[] = "/dev/urandom";
/* Allocate memory for the data buffer */
char *data_buf = (char *)malloc( opts->write_chunk_size);
if ( !data_buf )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Couldn't allocate memory for the data "
"buffer");
return PEF_RET_GENERIC_ERR;
}
int fd = open( dev_urandom, O_RDONLY);
int64_t bytes_read = -1;
if ( fd == -1 )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Couldn't open \"%s\": %s", dev_urandom,
PEF_STRERROR_R( local_err_msg, sizeof( local_err_msg)));
ret = PEF_RET_GENERIC_ERR;
goto pef_generate_initial_data_out;
}
bytes_read = read( fd, data_buf, opts->write_chunk_size);
if ( bytes_read == -1 )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Error reading \"%s\": %s", dev_urandom,
PEF_STRERROR_R( local_err_msg, sizeof( local_err_msg)));
ret = PEF_RET_GENERIC_ERR;
goto pef_generate_initial_data_out;
}
if ( bytes_read < opts->write_chunk_size )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "\"%s\" returned less data than it was "
"requested (%ld < %ld)", dev_urandom, bytes_read,
opts->write_chunk_size);
ret = PEF_RET_GENERIC_ERR;
goto pef_generate_initial_data_out;
}
pef_generate_initial_data_out:
/* Don't care of a value returned by "close()" */
if ( fd >= 0 ) close( fd);
if ( ret || !buf_ret )
{
if ( data_buf ) free( data_buf);
} else
{
PEF_ASSERT( data_buf);
*buf_ret = data_buf;
}
return ret;
}
#if 0
/**
* This version of "pef_GenerateInitialData()" relies on "getrandom()" system call.
* Currently we don't use this version by default because at the time of this writing most
* Linux installations were not fresh enough and didn't support "getrandom()".
* Uncomment this function and also a corresponding "include" at the beginning of the file
* if you want to enable random data generation based on "getrandom()"
*/
static int pef_GenerateInitialData( const pef_Opts_t* const opts,
char **buf_ret,
std::string indent,
char *err_msg,
int err_msg_size)
{
PEF_ASSERT( opts);
PEF_VERBOSE( opts, "%sGenerating %ld bytes of initial data", indent.c_str(),
opts->write_chunk_size);
int ret = 0;
char local_err_msg[PEF_ERR_MSG_SIZE];
int64_t num_bytes = -1;
/* Allocate memory for the data buffer */
char *data_buf = (char *)malloc( opts->write_chunk_size);
if ( !data_buf )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Couldn't allocate memory for the data "
"buffer");
return PEF_RET_GENERIC_ERR;
}
num_bytes = getrandom( data_buf, opts->write_chunk_size, GRND_NONBLOCK);
if ( num_bytes == -1 )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Generation of data by means of "
"\"getrandom()\" failed: %s",
PEF_STRERROR_R( local_err_msg, sizeof( local_err_msg)));
ret = PEF_RET_GENERIC_ERR;
goto pef_generate_initial_data_out;
}
if ( num_bytes < opts->write_chunk_size )
{
PEF_ERROR( "\"getrandom()\" returned less data than it was requested "
"(%ld < %ld)", num_bytes, opts->write_chunk_size);
ret = PEF_RET_GENERIC_ERR;
goto pef_generate_initial_data_out;
}
pef_generate_initial_data_out:
if ( ret || !buf_ret )
{
if ( data_buf ) free( data_buf);
} else
{
PEF_ASSERT( data_buf);
*buf_ret = data_buf;
}
return ret;
}
#endif
/**
* Provide with data of requested size
*
* Currently this function just returns a pointer to the data pre-generated by
* "pef_GenerateInitialData()" routine. The function extracts this pointer from
* "common_arg"
*
* Return value: pointer to a buffer with data
*/
static int pef_GetData( void *common_arg,
const pef_Opts_t* const opts,
int64_t size,
char **buf,
char *err_msg,
int err_msg_size)
{
PEF_ASSERT( common_arg && opts);
if ( (size < 0) || (size > opts->write_chunk_size) )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Size of requested data is invalid: %ld "
"(must be from 0 to %ld)", size, opts->write_chunk_size);
return PEF_RET_GENERIC_ERR;
}
if ( buf ) *buf = ((pef_GetDataArg_t*)common_arg)->data_buf;
return 0;
}
/**
* Get cache line size
*
* The implementation provided here is platform-specific. It's expected to work seamlessly
* on Linux
*
* It's expected that PEF will be executed on a system with homogenous CPUs (or on a
* system with a single CPU). There is no mechanism in PEF to resolve false memory
* sharing issues on systems with heterogenous CPUs
*/
static int pef_GetCacheLineSize( const pef_Opts_t* const opts,
int *cline_size_ret,
std::string indent,
char *err_msg,
int err_msg_size)
{
PEF_ASSERT( opts);
PEF_VERBOSE( opts, "%sGetting cache line size", indent.c_str());
indent += "\t";
const char *ind = indent.c_str();
char local_err_msg[PEF_ERR_MSG_SIZE];
long cline_size = -1;
/* Get cache line size using "sysconf" */
long cline_size_sysconf = sysconf( _SC_LEVEL1_DCACHE_LINESIZE);
if ( cline_size_sysconf == -1 )
{
PEF_VERBOSE( opts, "%sCouldn't get cache line size by means of \"sysconf()\"",
ind);
} else
{
PEF_VERBOSE( opts, "%sCache line size reported by \"sysconf()\": %ld", ind,
cline_size_sysconf);
}
/* Get cache line size using sysfs */
long cline_size_sysfs = -1;
const char *sysfs_path = "/sys/devices/system/cpu/cpu0/cache/index0/"
"coherency_line_size";
const char *sysfs_err_prefix = "Couldn't get cache line size from SysFS: ";
FILE *file = fopen( sysfs_path, "r");
if ( !file )
{
PEF_VERBOSE( opts, "%s%serror opening \"%s\": %s", ind, sysfs_err_prefix,
sysfs_path,
PEF_STRERROR_R( local_err_msg, sizeof( local_err_msg)));
} else
{
errno = 0;
if ( fscanf( file, "%ld", &cline_size_sysfs) != 1 )
{
/* Restore the initial value of the variable. We do that because it's unclear
from the C++ documentation whether "fscanf()" may modify the argument in
case of errors */
cline_size_sysfs = -1;
if ( feof( file) )
{
PEF_VERBOSE( opts, "%s%sfile \"%s\" is empty", ind, sysfs_err_prefix,
sysfs_path);
} else
{
PEF_ASSERT( errno);
PEF_VERBOSE( opts, "%s%serror extracting a value from \"%s\": %s", ind,
sysfs_err_prefix, sysfs_path,
PEF_STRERROR_R( local_err_msg, sizeof( local_err_msg)));
}
} else
{
PEF_VERBOSE( opts, "%sCache line size obtained from SysFS: %ld", ind,
cline_size_sysfs);
}
}
/* Don't care of possible errors returned by "fclose()" */
fclose( file);
if ( cline_size_sysconf == -1 && cline_size_sysfs == -1 )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Couldn't get cache line size by neither "
"of the methods");
return PEF_RET_GENERIC_ERR;
}
if ( cline_size_sysconf != -1 && cline_size_sysfs != -1 &&
cline_size_sysconf != cline_size_sysfs )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Cache line sizes obtained by different "
"methods are different");
return PEF_RET_GENERIC_ERR;
}
if ( cline_size_sysconf != -1 )
{
PEF_VERBOSE( opts, "%sUsing cache line size returned by \"sysconf()\"", ind);
cline_size = cline_size_sysconf;
} else
{
PEF_VERBOSE( opts, "%sUsing cache line size obtained from SysFS\n", ind);
cline_size = cline_size_sysfs;
}
if ( cline_size_ret ) *cline_size_ret = cline_size;
return 0;
}
/**
* Allocate memory for references to workers' data
*/
static int pef_AllocReferences( const pef_Opts_t* const opts,
pef_WorkerData_t *wdata,
std::string indent,
char *err_msg,
int err_msg_size)
{
PEF_ASSERT( opts);
int num_workers = opts->num_threads;
const char *wdata_error_prefix = "Couldn't allocate memory for an array of "
"references to workers' ";
pef_WorkerSharedSummary_t **shared_summary = 0;
pef_WorkerPrivateSummary_t **private_summary = 0;
pef_WorkerTaskStat_t **stat_tasks = 0;
pthread_mutex_t **locks = 0;
int ret = 0;
PEF_VERBOSE( opts, "%sAllocating memory for storing references to per-worker data "
"structures", indent.c_str());
shared_summary = (pef_WorkerSharedSummary_t**)
calloc( num_workers, sizeof( pef_WorkerSharedSummary_t*));
if ( !shared_summary )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "%s%s", wdata_error_prefix,
"shared summary structures");
ret = PEF_RET_GENERIC_ERR;
goto pef_alloc_references_out;
}
private_summary = (pef_WorkerPrivateSummary_t**)
calloc( num_workers, sizeof( pef_WorkerPrivateSummary_t*));
if ( !private_summary )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "%s%s", wdata_error_prefix,
"private summary structures");
ret = PEF_RET_GENERIC_ERR;
goto pef_alloc_references_out;
}
stat_tasks = (pef_WorkerTaskStat_t**)
calloc( num_workers, sizeof( pef_WorkerTaskStat_t*));
if ( !stat_tasks )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "%s%s", wdata_error_prefix,
"per-task statistics data");
ret = PEF_RET_GENERIC_ERR;
goto pef_alloc_references_out;
}
if ( opts->screen_stat_timer != -1 )
{
locks = (pthread_mutex_t**)calloc( num_workers, sizeof( pthread_mutex_t*));
if ( !locks )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "%s%s", wdata_error_prefix, "locks");
ret = PEF_RET_GENERIC_ERR;
goto pef_alloc_references_out;
}
}
pef_alloc_references_out:
if ( !ret && wdata )
{
wdata->shared_summary = shared_summary;
wdata->private_summary = private_summary;
wdata->stat_tasks = stat_tasks;
wdata->locks = locks;
} else
{
if ( shared_summary ) free( shared_summary);
if ( private_summary ) free( private_summary);
if ( stat_tasks ) free( stat_tasks);
if ( locks ) free( locks);
}
return ret;
}
/**
* Allocate memory for workers' summary statistics
*/
static int pef_AllocSummaryStructs( pef_Opts_t *opts,
pef_WorkerData_t *wdata,
bool is_shared,
int cline_size,
std::string indent,
char *err_msg,
int err_msg_size)
{
PEF_ASSERT( wdata);
const char *summary_type = is_shared ? "shared" : "private";
PEF_VERBOSE( opts, "%sAllocating memory to store %s execution summary of each "
"worker", indent.c_str(), summary_type);
indent += "\t";
char local_err_msg[PEF_ERR_MSG_SIZE];
const char *ind = indent.c_str();
int struct_size = is_shared ? sizeof( pef_WorkerSharedSummary_t) :
sizeof( pef_WorkerPrivateSummary_t);
/* Calculate number of cache lines required to keep summary statistics */
int num_clines = struct_size / cline_size;
if ( struct_size % cline_size ) num_clines++;
PEF_VERBOSE( opts, "%s%d cache lines is required to keep each worker's %s summary "
"structure", ind, num_clines, summary_type);
int64_t i = 0;
for ( i = 0; i < opts->num_threads; i++ )
{
void *ptr = 0;
ptr = aligned_alloc( cline_size, num_clines * cline_size);
if ( !ptr ) break;
/* Initialize the allocated structure */
if ( is_shared )
{
pef_WorkerSharedSummary_t *shared_p = (pef_WorkerSharedSummary_t*)ptr;
*shared_p = {.tasks_completed = 0};
wdata->shared_summary[i] = shared_p;
} else
{
pef_WorkerPrivateSummary_t *private_p = (pef_WorkerPrivateSummary_t*)ptr;
*private_p = {.tasks_completed = 0};
wdata->private_summary[i] = private_p;
}
}
if ( i != opts->num_threads )
{
PEF_ASSERT( i < opts->num_threads);
PEF_BUFF_MSG( err_msg, err_msg_size, "Couldn't allocate memory for a %s summary "
"structure of worker #%ld: %s", summary_type, i,
PEF_STRERROR_R( local_err_msg, sizeof( local_err_msg)));
while ( --i >= 0 )
{
if ( is_shared )
{
free( wdata->shared_summary[i]);
} else
{
free( wdata->private_summary[i]);
}
}
return PEF_RET_GENERIC_ERR;
}
return 0;
}
/**
* Allocate memory for workers' detailed statistics
*/
static int pef_AllocTaskStatStructs( pef_Opts_t *opts,
pef_WorkerData_t *wdata,
int cline_size,
std::string indent,
char *err_msg,
int err_msg_size)
{
PEF_ASSERT( opts && wdata);
PEF_VERBOSE( opts, "%sAllocating memory to store detailed execution statistics of "
"each worker", indent.c_str());
indent += '\t';
const char *ind = indent.c_str();
int64_t num_files = opts->num_files;
int64_t num_workers = opts->num_threads;
if ( opts->is_verbose )
{
/* Calculate size of memory required to keep the detailed statistics */
int64_t group1_size = num_files % num_workers;
int64_t group2_size = num_workers - group1_size;
int64_t tasks_per_worker = num_files / num_workers;
/* Memory per worker from the first group */
/* Check that we will not have overflow on the multiplication below */
PEF_ASSERT( INT64_MAX / (int64_t)sizeof( pef_WorkerTaskStat_t) >=
tasks_per_worker + 1);
int64_t mem_sz1 = (tasks_per_worker + 1) * sizeof( pef_WorkerTaskStat_t);
/* Memory per worker from the second group */
int64_t mem_sz2 = tasks_per_worker * sizeof( pef_WorkerTaskStat_t);
/* Account for alignment */
/* Check that we will not have overflow on the multiplications below */
PEF_ASSERT( INT64_MAX - cline_size >= mem_sz1);
PEF_ASSERT( INT64_MAX - cline_size >= mem_sz2);
mem_sz1 = ((mem_sz1 / cline_size) + (mem_sz1 % cline_size ? 1 : 0)) * cline_size;
mem_sz2 = ((mem_sz2 / cline_size) + (mem_sz2 % cline_size ? 1 : 0)) * cline_size;
/* Check that we will not get overflow while calulating "memory_needed" */
/* "group1_size" may be equal to zero */
if ( group1_size ) PEF_ASSERT( INT64_MAX / group1_size >= mem_sz1);
PEF_ASSERT( INT64_MAX / group2_size >= mem_sz2);
PEF_ASSERT( INT64_MAX - group1_size * mem_sz1 >= group2_size * mem_sz2);
int64_t memory_needed = group1_size * mem_sz1 + group2_size * mem_sz2;
PEF_OUT( "%s%ld bytes of memory is required to keep detailed statistics of "
"all worker threads", ind, memory_needed);
}
int64_t i = 0;
for ( i = 0; i < num_workers; i++ )
{
int64_t tasks_to_do = num_files / num_workers;
if ( i < (num_files % num_workers ) ) tasks_to_do++;
int64_t mem_size = tasks_to_do * sizeof( pef_WorkerTaskStat_t);
/* Check that we didn't get overflow on the multiplication above */
PEF_ASSERT( INT64_MAX / (int64_t)sizeof( pef_WorkerTaskStat_t) >= tasks_to_do);
/* Take alignement into account */
mem_size = ((mem_size / cline_size) + (mem_size % cline_size ? 1 : 0)) *
cline_size;
/* Check that we didn't get overflow on the multiplication above */
PEF_ASSERT( INT64_MAX - cline_size >= mem_size);
/* Finally allocate memory */
(wdata->stat_tasks)[i] = (pef_WorkerTaskStat_t*)aligned_alloc( cline_size,
mem_size);
if ( (wdata->stat_tasks)[i] == 0 ) break;
/* Initialize allocated structures */
for ( int64_t j = 0; j < tasks_to_do; j++ )
{
(wdata->stat_tasks)[i][j] = {.completion_time = PEF_TIME_ZERO};
}
}
if ( i < num_workers )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Couldn't allocate memory for the detailed "
"execution statistics of worker #%ld", i);
while ( --i >= 0 ) free( (wdata->stat_tasks)[i]);
return PEF_RET_GENERIC_ERR;
}
return 0;
}
/**
* Allocate locks used to guard access to workers' shared data
*/
static int pef_AllocLocks( pef_Opts_t *opts,
pef_WorkerData_t *wdata,
int cline_size,
std::string indent,
char *err_msg,
int err_msg_size)
{
PEF_ASSERT( opts && wdata);
PEF_VERBOSE( opts, "%sAllocating memory for locks used to guard access to "
"workers' shared execution summary", indent.c_str());
indent += '\t';
const char *ind = indent.c_str();
/* Calculate number of cache lines needed to store one lock */
/* Most of the code below is redundant because "pthread_mutex_t" is a really small
data structure, while cache lines are more or less big. But we keep the code
universal */
int num_clines = sizeof( pthread_mutex_t) / cline_size;
if ( sizeof( pthread_mutex_t) % cline_size ) num_clines++;
PEF_VERBOSE( opts, "%s%d cache lines is required to keep one lock", ind, num_clines);
int64_t i = 0;
for ( i = 0; i < opts->num_threads; i++ )
{
pthread_mutex_t *lock_p = 0;
/* Definitely there will be no overflow on the multiplication below */
lock_p = (pthread_mutex_t*)aligned_alloc( cline_size, num_clines * cline_size);
if ( !lock_p ) break;
*lock_p = PTHREAD_MUTEX_INITIALIZER;
wdata->locks[i] = lock_p;
}
if ( i < opts->num_threads )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Couldn't allocate memory for the lock of "
"worker #%ld", i);
while ( --i >= 0 ) free( wdata->locks[i]);
return PEF_RET_GENERIC_ERR;
}
return 0;
}
/**
* Deallocate memory occupied by workers' data structures
*/
static void pef_DeallocateWorkerStructs( pef_Opts_t *opts,
pef_WorkerData_t *wdata,
std::string indent)
{
PEF_ASSERT( opts && wdata);
PEF_VERBOSE( opts, "%sDeallocating memory occupied by workers' data structures",
indent.c_str());
/* The code below assumes that (*wdata) and all the arrays referenced from (*wdata)
were properly initialized right during allocation (namely be zeros) */
if ( wdata->shared_summary )
{
for ( int i = 0; i < opts->num_threads; i++ )
{
if ( wdata->shared_summary[i] ) free( wdata->shared_summary[i]);
}
free( wdata->shared_summary);
wdata->shared_summary = 0;
}
if ( wdata->private_summary )
{
for ( int i = 0; i < opts->num_threads; i++ )
{
if ( wdata->private_summary[i] ) free( wdata->private_summary[i]);
}
free( wdata->private_summary);
wdata->private_summary = 0;
}
if ( wdata->stat_tasks )
{
for ( int i = 0; i < opts->num_threads; i++ )
{
if ( wdata->stat_tasks[i] ) free( wdata->stat_tasks[i]);
}
free( wdata->stat_tasks);
wdata->stat_tasks = 0;
}
if ( wdata->locks )
{
/* Memory locks are allocated only if progress monitoring is requested */
PEF_ASSERT( opts->screen_stat_timer != -1);
for ( int i = 0; i < opts->num_threads; i++ )
{
if ( wdata->locks[i] )
{
pthread_mutex_destroy( wdata->locks[i]);
free( wdata->locks[i]);
}
}
free( wdata->locks);
wdata->locks = 0;
}
return;
}
/**
* Allocate data structures that scale in the same way as workers scale (i.e. each worker
* will have its own set of those structures):
* - structures used to store per-worker execution statistics
* - locks that exist per worker
*
* All data structures allocated inside this function are aligned to cache line size to
* avoid false memory sharing issues
*/
static int pef_AllocateWorkerStructs( pef_Opts_t *opts,
pef_WorkerData_t *wdata_ret,
std::string indent,
char *err_msg,
int err_msg_size)
{
PEF_ASSERT( opts);
int ret = 0;
char local_err_msg[PEF_ERR_MSG_SIZE];
int cline_size = -1;
pef_WorkerData_t wdata = {.shared_summary = 0, .private_summary = 0, .stat_tasks = 0,
.locks = 0};
PEF_VERBOSE( opts, "%sAllocating memory to store per-worker execution statistics "
"and locks", indent.c_str());
indent += "\t";
const char *ind = indent.c_str();
ret = pef_GetCacheLineSize( opts, &cline_size, indent, local_err_msg,
sizeof( local_err_msg));
if ( ret )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Error getting cache line size: %s",
local_err_msg);
return PEF_RET_GENERIC_ERR;
}
ret = pef_AllocReferences( opts, &wdata, indent, local_err_msg,
sizeof( local_err_msg));
if ( ret )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Error allocating arrays of references to "
"per-worker data structures: %s", local_err_msg);
return PEF_RET_GENERIC_ERR;
}
ret = pef_AllocSummaryStructs( opts, &wdata, true, cline_size, indent, local_err_msg,
sizeof( local_err_msg));
if ( ret )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Error allocating shared execution summary "
"structs: %s", local_err_msg);
ret = PEF_RET_GENERIC_ERR;
goto pef_allocate_per_worker_structs_out;
}
ret = pef_AllocSummaryStructs( opts, &wdata, false, cline_size, indent,
local_err_msg, sizeof( local_err_msg));
if ( ret )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Error allocating private execution "
"summary structs: %s", local_err_msg);
ret = PEF_RET_GENERIC_ERR;
goto pef_allocate_per_worker_structs_out;
}
ret = pef_AllocTaskStatStructs( opts, &wdata, cline_size, indent, local_err_msg,
sizeof( local_err_msg));
if ( ret )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Error allocating structs for storing "
"detailed execution statistics: %s", local_err_msg);
ret = PEF_RET_GENERIC_ERR;
goto pef_allocate_per_worker_structs_out;
}
if ( opts->screen_stat_timer != -1 )
{
ret = pef_AllocLocks( opts, &wdata, cline_size, indent, local_err_msg,
sizeof( local_err_msg));
if ( ret )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Error allocating memory for locks "
"used to guard access to workers' shared data: %s",
local_err_msg);
ret = PEF_RET_GENERIC_ERR;
goto pef_allocate_per_worker_structs_out;
}
} else
{
PEF_VERBOSE( opts, "%sSkipped allocating memory for locks. Workers are "
"expected to have exclusive access to all their structures", ind);
}
pef_allocate_per_worker_structs_out:
if ( !ret && wdata_ret ) *wdata_ret = wdata;
if ( ret || !wdata_ret ) pef_DeallocateWorkerStructs( opts, &wdata, indent);
return ret;
}
/**
* Compose array of worker arguments
*
* Return: array of worker arguments (should be deallocated by caller)
*/
static int pef_PrepareWorkerArgs( const pef_Opts_t* const opts,
pef_GetDataArg_t *get_data_arg,
pef_WorkerData_t *wdata,
pef_WorkerArg_t **args_ret,
std::string indent,
char *err_msg,
int err_msg_size)
{
PEF_ASSERT( opts && wdata);
PEF_VERBOSE( opts, "%sPreparing worker arguments", indent.c_str());
const char *ind = (indent += "\t").c_str();
int num_threads = opts->num_threads;
pef_WorkerArg_t *worker_args = 0;
worker_args = (pef_WorkerArg_t*)calloc( num_threads, sizeof( pef_WorkerArg_t));
if ( !worker_args )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Couldn't allocate array of worker "
"arguments");
return PEF_RET_GENERIC_ERR;
}
PEF_VERBOSE( opts, "%sAllocated array of worker arguments", ind);
for ( int i = 0; i < num_threads; i++ )
{
worker_args[i].worker_id = i;
worker_args[i].opts = opts;
worker_args[i].shared_summary = wdata->shared_summary[i];
worker_args[i].private_summary = wdata->private_summary[i];
worker_args[i].stat_tasks = wdata->stat_tasks[i];
worker_args[i].get_data = pef_GetData;
worker_args[i].get_data_arg = get_data_arg;
worker_args[i].err_msg[0] = '\0';
if ( wdata->locks ) worker_args[i].lock = wdata->locks[i];
}
PEF_VERBOSE( opts, "%sInitialized all the arguments", ind);
if ( args_ret ) *args_ret = worker_args; else free( worker_args);
return 0;
}
/**
* Spawn worker threads
*
* Return: array of thread descriptors, array of booleans indicating whether the
* corresponding worker was created
*
* If the function succeeds, both arrays must be deallocated by caller
*
* The array of boolean indicators is needed only by "pef_JoinWithAllTheThreads()"
* function for tracking which threads have been already joined with. But it's allocated
* here - and not inside "pef_JoinWithAllTheThreads()" - because allocation of memory can
* fail. Thus, before the threads are started, it's better to make sure that it will be
* possible to track their execution status
*/
static int pef_SpawnWorkerThreads( const pef_Opts_t* const opts,
pef_WorkerArg_t *worker_args,
pthread_t **threads_ret,
bool **is_thread_started_ret,
std::string indent,
char *err_msg,
int err_msg_size)
{
PEF_ASSERT( opts && worker_args && threads_ret && is_thread_started_ret);
PEF_VERBOSE( opts, "%sSpawning worker threads", indent.c_str());
int ret = 0;
const char *ind = (indent += "\t").c_str();
int64_t num_threads = opts->num_threads;
pthread_t *threads = (pthread_t*)calloc( num_threads, sizeof( pthread_t));
if ( !threads )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Couldn't allocate memory for an array of "
"worker thread descriptors");
return PEF_RET_GENERIC_ERR;
}
PEF_VERBOSE( opts, "%sAllocated memory for worker thread descriptors", ind);
bool *is_thread_started = (bool*)calloc( num_threads, sizeof( bool));
if ( !is_thread_started )
{
PEF_BUFF_MSG( err_msg, err_msg_size, "Couldn't allocate memory for an array of "
"booleans indicating which workers have been created");
free( threads);
return PEF_RET_GENERIC_ERR;
}
PEF_VERBOSE( opts, "%sAllocated memory for booleans indicating which workers have "
"been created", ind);
PEF_VERBOSE( opts, "%sCreating threads...", ind);
int64_t i = 0;
for ( i = 0; i < num_threads; i++ )
{
if ( pthread_create( &threads[i], NULL, pef_DoWork, &worker_args[i]) ) break;
is_thread_started[i] = true;
}
if ( i < num_threads )
{
PEF_EMIT_ERR_MSG( "Creation of worker #%ld failed", i);
PEF_BUFF_MSG( err_msg, err_msg_size, "Error creating worker thread #%ld", i);
ret = PEF_RET_GENERIC_ERR;
} else
{
PEF_VERBOSE( opts, "%s\tAll worker threads started", ind);
}
/* Even if the thread-creation loop failed to create ALL the threads, we still need to
join with the threads that WERE created. That's why this function returns a pointer
to the array of thread desctiptors (and also a pointer to the array of boolean