-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmaketrie-stampede.c
1770 lines (1479 loc) · 71.1 KB
/
maketrie-stampede.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*< This is the source code for a program that builds a fast data structure
containing all the reads from a gene sequencing system. Click on comments
like this one to drill down to the code that implements the main algorithm. */
/*< This source file is a complete program and no other files are required to
duplicate/test this work. Download maketrie.c from the same web directory
for a compilable copy.
It compiles with something like:
<tt>mpicc -openmp -O -o maketrie -Wall maketrie.c</tt>
and can be used with your own data in fastq format by:
<tt>./maketrie file.fastq</tt>
The code will run on a single node but does need to be compiled on an MPI
system. It currently uses MPI to scale the amount of RAM used to the problem
size, however the more memory that each processor has, the more efficiently
the code will run. A run of an earlier version of this code (which determined
overlaps as well as building the trie) took 24hrs on 8 regular compute nodes,
but ran in 4 hrs on a single compute node which had 8 times the normal amount
of RAM.
This version of the code is deliberately simple and does not use any
optimisations that would obfuscate the algorithm, in order to serve as a
blueprint for others to include this algorithm their gene assembly pipelines.
Graham Toal <<tt>gtoal@gtoal.com</tt>>
May 2013.
*/
/*
This code is intended to solve one part of de novo gene assembly.
Currently it solves a simplified version of the problem described as
follows:
* Generate a gene (a string of length 'G') consisting of the letters CGAT
chosen randomly. G is a large number between tens of thousands and
billions. For this test, we do *not* read the complementary string.
(Add that later. Shouldn't add significantly more complexity to
overlap detection, but will double the amount of data to be handled)
Neither do we yet handle repeats (eg take a 4k substring and insert
elsewhere, optionally with about 1 in 10 random letter changes to
simulate evolutionary convergence - not relevant to overlap detection,
more significant for contig reassembly)
* Sample that string at random locations and extract 'K' substrings
of length 'L'. (These are called 'Reads' or 'Short Reads').
L is fixed for any one extract.
We're using L=37 although this code is independent of the size
of L. The output READs are *not* tagged with the location they
were found at in the gene. (see accompanying program genelab.c for
the software that creates this dummy data). We generate K READs
such that there should be sufficient samples to cover G and that
all READs have a high probability of overlapping with a nearby
READ.
* read in the READs and build a trie in memory containing all the READs.
This trie is likely to be larger than the ram capacity of a single
compute node. (A secondary goal is to see how large a gene can
be handled on a single compute node)
* trie-building is not a parallel activity but we can make use of
the memory of multiple nodes to keep the entire trie in RAM with
relatively little inter-processor communications overhead. Early
test results of trie-construction for G=100,000,000, K=40,000,000,
and L=37 are 5 minutes on a single node; for G=200,000,000,
K=80,000,000, and L=37 we require 3 nodes and the process takes 15
minutes. There is an overhead for using more than one node but it is
not significant.
* A second program (findoverlaps) then determines all the overlaps
between READs, and outputs a graph of these overlaps to a file.
A subsequent phase (preferably written by someone else) will extract
the lowest-cost path through that graph which covers the majority of
READs. Locating the overlaps is now the most expensive part of this
code, but fortunately it is embarassingly parallel! By grouping nodes
in subgroups of sufficient nodes to hold a full copy of the trie, we
can have multiple copies of these subgroups, each of which can handle
another subset of N.
(duplicate READs are found on the fly as they are added to the trie.
We only record the first occurance in the trie, writing the duplicate
information out to file at the point that the duplication is detected.
Unlike the later overlap detection, this process can happen on any
of the compute nodes, so there are multiple output files containing
subsets of the results, which need to be externally concatenated after
the run completes. This isn't a high overhead (it's just "cat") but
is a hidden cost when calculating the effectiveness of this algorithm)
At this point in development, this code builds the trie as above.
Once the trie is built correctly and confirmed, I will add the code
to calculate overlaps. This is actually a relatively low cost function
whose complexity is K * L, ie effectively linear. It can be parallelised
trivially in L (using OMP) and with a little more effort in K (using MPI),
ie overlap determination is embarassingly parallel.
To determine an overlap, take a single READ and walk the trie from
the first letter in the READ. If the trie-walk has not prematurely
terminated before hitting the end of the READ, there is an overlap.
The overlap can be defined by storing the trie index at the furthest
point that was reached, and later by walking what is left in that
subtree from that stored index, you will reach the end of all the
matching READs. An identifier for the READ is stored at the end
of the trie entry for that READ.
Then repeat the last processes starting at the second, third, etc
letter in the READ to locate overlaps starting at those offsets.
This can either be done in a loop of size L or by L processes in
parallel.
The cost of a single comparison is O(L) (reducing from L-1 to 2 (or
'MIN_OVERLAP' which will be larger - explanation elsewhere) as the
overlap length shortens. An overlap of 1 is not useful so we will
probably have a cut-off point later in terms of a minimal overlap -
not defined by length but by how many other READs overlap at this
offset... A graph of offset vs #overlap_count is also potentially
useful to determine critical sections of overlap - if overlaps
were random, it would be a logarithmic curve. If there are flat
sections, that tells us about a point of inflexion which may be
useful in graph re-assembly later)
Because trie lookups do not modify the trie, they are safe to execute
in parallel. (OMP, not MPI)
The final output of this code will be a file containing a complete
overlap graph which can be used to reconstitute G. Although I have
some ideas on how this might be done, I am going to restrict this
project to just the overlap calculation part to keep it tractable.
I do understand that the subset of the problem that this code solves
is not exactly the same as the real-life problem, but I'm throwing
it out there to people working in this area to see if the two domains
are sufficiently close that this can be adapted into a real system.
Please supply feedback including requests for any key components that
would make it applicable to real life gene assembly to:
Graham Toal <gtoal@gtoal.com>
(started in mid April 2013, working reliably by May 19th, 2013)
This code was tested using four nodes on the Texas Advanced Computer
Center's "Stampede" cluster.
PS This program has been algorithmically optimised but not low-level
optimised. A lot of operations can be pipelined and the low-level
code could be improved. However it is sufficiently fast that I
haven't felt the need so far, and it is a lot easier to understand
the way it is written compared to what I would have to do to it to
get perhaps a factor of 2 speedup at best.
*/
/*< <em>(Current 'To do' list.)</em> */
/*> TO DO:
7) Work out why there's a lost 15 minutes at the start of the
matching. Is it file-system related? Could we go faster
by walking the dawg than by re-reading the index file???
[Only at TACC - doesn't happen on UTPA cluster]
11) Store data in the terminal nodes, more than just a pointer to
fastq entry for that read. Could store dup count, quality data,
ultimately pointers to contig data, ie make this a full graph
suitable for input to a gene assembly graph algorithm.
Note that to preserve the purity of the space allocation (ie
the almost-free 'last_used_node++'), we must make sure that
any data we want to attach to the leaves *must* be made to fit
into 40 bytes, whether it fits or not.
NOTE: 5 long longs * 8 bytes * 8 bits = 320 bits. Even using
3 bits per letter (to allow us to encode 'N's), that is 106 letters
we could store in a leaf!
So if storing the actual read at the leaf is helpful, we
could esily do it.
(And of course with only minimal effort we could use >1 leaf node,
if we really can't cope with the Procustean solution)
14) Because I used 0L as a special NULL pointer for trie edges (ROOT_NODE being 1L),
I may have screwed up and made read #0 impossible to access. I think
I should renumber the reads from a base of 1 up. Serendipitously,
it appears from various documentation that I've since read, that
reads are conventially numbered from 1 upwards anyway! So it won't
hurt to do a mass renaming.
15) BIG ONE HERE!: if we re-build the trie from the sorted data, adding
from L=1 to L=K using iterative deepening, we can guarantee to place
the first <N> chars of each string in the first memory block. Since
the majority of searches of matches are failures, this will speed up
the failures tremendously! And the beauty of it is that this is an
optional post-processing step that will speed up the data structure
but not change the contents in any significant way that affects
other code. Well worth doing if a saved trie is going to be accessed
repeatedly, especially with the code that runs on a single machine
and caches the first chunk of the trie_cell[] array...
16) In the short term:
give all available memory if only using 1 node.
give all avaiable memory to last node if using > 1 nodes - remember to
tweak chunksize to match so that it doesn't try to pass calls on to
a non-existant downstream node
In the long term:
Use modulo arithmetic rather than binary-and to separate by chunk size,
and remove constraint that chunks must be powers of two.
*/
/*>*//*>*/
/*< Declarations */
/*< '<tt>#include</tt>' header files. */
#define _FILE_OFFSET_BITS 64
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <mpi.h>
#include <omp.h>
#include <ctype.h>
#include <time.h> // for info only
// #define NDEBUG 1 // Set this if you want to remove the overhead of the assert tests.
#include <assert.h>
#ifndef FALSE
#define TRUE (0==0)
#define FALSE (!TRUE)
#endif
/*>*/
/*< Input/Output Files. */
static FILE *duplicates = NULL; // only rank 0 can write to this file.
static FILE *rejects = NULL; // only rank 0 can write to this file.
static FILE *read_file = NULL;
static FILE *read_index = NULL;
static FILE *trie_file = NULL;
static FILE *sorted_and_unique_reads = NULL; // only rank (mpisize-1) can write to this file.
/*>*/
/* This code goes to some effort to work out at runtime how much RAM is
available, and it allocates as much as it can up front for use by the
main data structure which is an array of structs.
In the event that this code is ported to a system where the dynamic
memory determination doesn't work, we do have a way to fall back on
a hard-coded default, which in the distribution is set to the relatively
low value of 4Gb below, in the hope that the code 'just works' the
first time someone compiles it. If this affects you and you're unable
to improve the dynamic memory-sizing code to understand your system,
increase the number below to suit your hardware... */
#define GBs 4L
/*< <em>(A note on 64-bit architecture)</em> */
// When I first wrote this code I tried to make it optional whether you
// used 32 bit or 64 bit data, to allow for a version for people on older
// processors. However at this point I doubt that anyone is trying to do
// gene assembly on 32 bit architectures and if they were, they wouldn't
// be able to handle very large data, so I intend to simplify the code now
// and assume 64-bit "long long" data throughout.
typedef unsigned long long EDGE;
typedef unsigned long long INDEX;
#define ENDS_WORD (1ULL<<63ULL)
#define EDGE_MASK (ENDS_WORD-1UL)
/*>*/
/*< THE MAIN DATA STRUCTURE */
// the structure below is a 5-way branching trie. Each edge contains a
// pointer (actually an index) to other edges in the array, and packed
// into each edge is a bit to denote 'end of word' (ie the last letter
// in a READ). To simplify the structure I have removed earlier support
// for multiple-length strings. Now all strings must be the same
// length. The actual length is not important, just that every string
// added to te data structure is the same length as every other.
// We could have saved some space by using a 4-way trie rather than a
// 5-way trie, but I included the 'N' code in reads in the hope of
// future-proofing the code. It's not much of an overhead.
// If I occasionally refer to this structure in the code below as the 'DAWG'
// that is because it is a variation of a structure used heavily in spelling
// checkers/correctors and in word games such as boggle, scrabble, etc.
// The mnemonic stands for "Directed Acyclic Word Graph".
// In this case we stay with the functionally compatible trie and do not
// perform the tail-compression that would convert the trie into a dag.
// (This is forced on us by the fact that we tag each READ at the end of
// its trie with the sequence number of the READ so that we can build
// the connectivity/overlap graph, thus the tails are no longer identical)
// Note: although not done yet, the use of a DAWG allows for cheap detection
// of 1-letter errors. (Actually n-letters for any n, but I don't know
// enough about the problem domain to know if n>1 is helpful?)
// See also...
// http://www.slideshare.net/agbiotec/overview-of-genome-assembly-algorithms
// for some background on the whole process
// A dynamically-allocated array [0..MAX_SIZE-1] of these trie edge cells forms the main data structure.
// The array is partitioned across all available compute nodes. We run one task ("PE") per
// compute-node and give it all available memory. The code will work if allocated to one
// task per core instead, but engenders an unnecessary inter-node communication overhead.
typedef struct cell {
// ACGT maps to 0 1 2 3, anything else such as 'N' is 4
EDGE edge[5];
} CELL;
CELL *trie_cell;
static INDEX MAX_SIZE = ((INDEX)0L);
#define ROOT_CELL ((INDEX)1L)
// Node 0 is unused, 0 is needed as a terminator.
// originally I used 'next_free_edge' and it was exclusive. Have now changed that
// to 'last_used_edge' which is inclusive. getting a free edge is now a function
// call that we can make atomic with an OMP PRAGMA.
static INDEX last_used_edge = ROOT_CELL; // should this be 0L instead since it is inclusive? ROOT_CELL is special, but where do I initialise it?
static CELL empty; // A constant cell, containing 0LL all nodes.
// trie_cell[edge].edge[_G_] will have ENDS_WORD clear and will point to 0 if there is no G at that position
// have ENDS_WORD clear and will point to another trie_cell if it is a G but not the last letter in the string
// have ENDS_WORD set if G is the last letter in the string, and it will point to the read number
/*< <em>('To do' notes on the data structure)</em> */
// It would be more consistent if this were shared among all nodes.
// the fact that it is not is only OK because only the last node is
// likely to be the target of adding a new edge, but I should code
//an explicit test for this just to be sure...
// this output confirms that we need to feed back updates of last_used_edge to all nodes :-( (200m test, 3 nodes, 64 bit):
// maketrie.721908.out:Node 2 exiting cleanly. local base = 1073741824, last_used_edge = 1602285502, local maximum = 1610612736
// maketrie.721908.out:Node 1 exiting cleanly. local base = 536870912, last_used_edge = 1096686229, local maximum = 1073741824
// *OR* need to restructure add_read so that a call to the next node does the increment implicitly, leaving our
// own last_used_edge permanently stuck at local maximum. eg the_new_edge = add_read_at_last_used_edge(...)
/*>*//*>*/
/*< (minor housekeeping: translate gene letters ACGTN to internal codes 0..4 on input and output */
#define _A_ 0
#define _C_ 1
#define _G_ 2
#define _T_ 3
#define _N_ 4
char *trt = "ACGTN"; // translate table back from the above
/*>*/
/*< (Statistics gathering) */
// Actually, the stats need to be gathered across multiple nodes, which is not
// currently implemented. This data isn't used anywhere to we could do away with
// this altogether, or when all the important code is written we could revisit
// this and implement it properly using an RPC call to communicate the data
// across each of the nodes.
static long freq[256];
static long letters = 0L;
static int seq = 0, dups = 0; // should seq be a long int? Do we get that many reads?
/*>*/
/*< Input of reads. "fastq" format etc. */
// There is a lbrary for reading fastq format which I may use later, but the
// format is so trivial I currently handle it ad hoc on the fly.
// We do want to optimise the I/O heavily as we are using such large
// files that I/O overhead becomes noticable (at least, if we are
// successful in getting the actual processing to be so fast that the
// I/O is no longer swamped by the processing times).
#define MAX_LINE 1024 // Longest input line. We're looking at lines of
// 37, 60, 70, 101 etc - maybe even in the 400's
// in some cases, so 1K should be overkill.
// I want to avoid all use of the heap if possible
// in order to make memory usage predictable.
// We check the length of all READs and raise an error if they are not all the same.
static long length[MAX_LINE];
static int read_length = 0; // This is the length we found in the READ file.
/*>*/
/*< MPI support. */
//------------------------------------------------------------------------
// This hard-wired default shouldn't be needed now unless we
// fail to grope the peripherals correctly...
// thumper: #define CORES_PER_NODE 12ULL
// stampede:
#define CORES_PER_NODE 16ULL
// MPI support:
static int mpirank = 0, mpisize = 1;
// Used in calculation to detemine how much memory each task on a node
// should allocate for itself. If we have more than one task per node,
// the first one is in danger of claiming all available memory and the
// subsequent ones will fail to claim memory, resulting in an error (or
// worse, possibly a crash)
static long long TASKS_PER_NODE, PROCESSORS_PER_NODE;
// for example with 16 processors per node we can either run 16 tasks
// each with 1/16 of the available memory, or 1 task using all the memory
/*>*/
/*< The code is structured around a simulated Remote Procedure Call mechanism,
implemented on top of MPI. Rank 0 remains an imperative main program
but all the other ranks run MPI listeners to handle procedure calls
originating in rank 0. */
// All MPI calls made by the program must follow the same template.
// 1) send the command code. This may include one optional 'long' data item. No Ack.
// 2) Send any more parameters. Send multiple items in a single call when possible.
// 3) Receive results
// 4) if no results expected, receive empty 'done' acknowlegement
// NOTE. Callers should be single-threaded *FOR NOW* - don't ever send two requests in parallel
// however receivers can handle more than one caller in parallel (one call per calling system)
// It should be possible to do trie-building in parallel as long as we lock critical
// sections of local_add_read with omp pragmas. Critical sections being where an
// element is updated. (The primitive has to be read-modify-write, not write.)
// Receiving process should:
// 1) receive command code.
// 2) if more parameters to follow, receive them *from caller*
// 2) If no more parameters to follow, execute local command
// 3) Send results to caller
// 4) If no results, send 'done' acknowlegement to caller
// Note in next iteration, assuming it is safe, we can drop the acknowlegements
// and at some point we hope to use OMP to allow parallel dispatch loops.
// Data:
#define TAG_DATA 1
#define TAG_ACK 2
// Low-level data access commands
#define TAG_SEND_RAW_MEM 3
#define TAG_READ_READ 4
#define TAG_WRITE_READ 5
#define TAG_EXIT_PROGRAM 6
// Application-level Remote Procedure Calls
#define TAG_ADD_READ 7
#define TAG_GET_NEXT_FREE_EDGE 8
#define TAG_OUTPUT_DUPINFO 9
#define TAG_OUTPUT_READ 10
#define TAG_WALK_AND_PRINT_TRIE_INTERNAL 13
#define TAG_DUMP_TRIE 14
/*>*/
/*< Create a virtual array using the memory of as many nodes as are available.
Use one process per node and give it all available memory.
We assume that memory rather than CPU is the bottleneck with our algorithm - we
don't need high concurrency - one processor would be enough, if there were
sufficient RAM available. Until the day when we routinely have onboard memories
measured in Terabytes, this will have to do as a slow approximation.
(Need to compare to speed using vSMP system built out of 8 nodes)
This paper may have some relevance: http://www.cs.berkeley.edu/~bonachea/upc/mpi2.html
If we use higher-level calls (eg add_string_to_trie) then in most cases we'll have one
cross-machine call per hardware node or less, rather than one for every letter in the READ. */
// initially these were #define'd but now we work them out on the fly...
// Chunk is a power of two. It is a count of trie cells, not a size of RAM
// There are CHUNKSIZE trie cells per instance of this code. Multiply by
// mpisize for total size across all nodes. (MAX_SIZE)
static long long CHUNKBITS, CHUNKSIZE, CHUNKMASK;
/*>*//*>*/
/*< Procedures */
/*< Support procedures. */
static void shut_down_other_nodes(void) // what the user calls
{
int target_rank;
long int value = 0L;
MPI_Status status;
for (target_rank = 1; target_rank < mpisize; target_rank++) {
if (target_rank != mpirank) { // not me. Or 0, unfortunately.
MPI_Send(&value,/* message buffer - data part and trigger to perform operation */
1,/* one data item */
MPI_LONG,/* data value is a long integer */
target_rank,/* destination process rank */
TAG_EXIT_PROGRAM, // The command.
MPI_COMM_WORLD);/* always use this */
// Get acknowlegement that it has been written (to synchronise)
MPI_Recv(&value,/* message buffer - not used */
1,/* one data item */
MPI_LONG,/* response is a long int for now (temp) */
/*rank,*/ MPI_ANY_SOURCE,/* receive from any sender */
/* TAG_ACK,*/ MPI_ANY_TAG,/* receive any type of message */
MPI_COMM_WORLD,/* always use this */
&status);/* info about received message */
}
}
}
// Send raw memory via MPI_Send.
// Using this assumes a homogeneous cluster (same byte sex everywhere). Not a problem for us.
// If used on structs rather than arrays, remember to handle machine-dependent padding issues.
static void send_bytes(int dest, void *memp, long bytes) { // just a type-checked macro
MPI_Send(memp, bytes, MPI_BYTE, dest, TAG_SEND_RAW_MEM, MPI_COMM_WORLD);
}
/*>*/
/*< remote_*: these procedures call a later rank. These are mostly internal housekeeping procedures, not application calls */
static void remote_walk_and_print_trie_internal(int target_rank, char *s, EDGE edge, int len)
{
MPI_Status status;
long value = 0L; // generic up-front parameter
long stringlength;
stringlength = strlen(s)+1;
// COMMAND CODE
MPI_Send(&value,/* message buffer - data part and trigger to perform operation */
1,/* one data item */
MPI_LONG,/* data value is a long integer */
target_rank,/* destination process rank */
TAG_WALK_AND_PRINT_TRIE_INTERNAL,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
// SEND PARAMETERS - later optimise by sending one param via 'value' above
MPI_Send(&stringlength,/* message buffer - address part */
1,/* one data item */
MPI_LONG,/* index is a long long integer */
target_rank,/* destination process rank */
TAG_DATA,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
send_bytes(target_rank, s, stringlength);
MPI_Send(&edge,
1,/* one data item */
MPI_LONG,/* index is a long long integer */
target_rank,/* destination process rank */
TAG_DATA,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
MPI_Send(&len,
1,/* one data item */
MPI_INT,/* index is a long long integer */
target_rank,/* destination process rank */
TAG_DATA,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
// Get acknowlegement that it has been written (to synchronise)
MPI_Recv(&value,/* message buffer - used for len function result */
1,/* one data item */
MPI_LONG,/* response is a long int for now (temp) */
MPI_ANY_SOURCE,/* receive from any sender */
MPI_ANY_TAG,/* receive any type of message */
MPI_COMM_WORLD,/* always use this */
&status);/* info about received message */
}
static void remote_dump_trie(int target_rank, char *filename)
{
MPI_Status status;
long value = 0L; // generic up-front parameter
long stringlength;
stringlength = strlen(filename)+1;
// COMMAND CODE
MPI_Send(&value,/* message buffer - data part and trigger to perform operation */
1,/* one data item */
MPI_LONG,/* data value is a long integer */
target_rank,/* destination process rank */
TAG_DUMP_TRIE,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
// SEND PARAMETERS - later optimise by sending one param via 'value' above
MPI_Send(&stringlength,/* message buffer - address part */
1,/* one data item */
MPI_LONG,/* index is a long long integer */
target_rank,/* destination process rank */
TAG_DATA,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
send_bytes(target_rank, filename, stringlength);
// Get acknowlegement that it has been written (to synchronise)
MPI_Recv(&value,/* message buffer - used for len function result */
1,/* one data item */
MPI_LONG,/* response is a long int for now (temp) */
MPI_ANY_SOURCE,/* receive from any sender */
MPI_ANY_TAG,/* receive any type of message */
MPI_COMM_WORLD,/* always use this */
&status);/* info about received message */
}
// what setread calls internally to ask another node to assign to a trie cell
static void remote_setread(int target_rank, INDEX index, CELL value)
{
long int dummy;
MPI_Status status;
//static int debug = 0;
// write this data in the part of the virtual array held on another node
// The code is essentially serial - we are only using the other nodes
// as a cache for memory
// This is just a sketch from some example pages on the net - NOT the actual code we need:
// some caveats about sending multiple messages:
// http://stackoverflow.com/questions/2024214/mpi-buffered-send-receive-order
assert(target_rank > mpirank);
MPI_Send(&dummy,/* message buffer - data part and trigger to perform operation */
1,/* one data item */
MPI_LONG,/* data value is a long integer */
target_rank,/* destination process rank */
TAG_WRITE_READ,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
MPI_Send(&index,/* message buffer - address part */
1,/* one data item */
MPI_LONG_LONG,/* index is a long long integer */
target_rank,/* destination process rank */
TAG_DATA,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
// BEWARE!!!! On the system I'm using, sizeof(long) == sizeof(long long).
// So the options to select word size are not being properly tested.
// I do have an older 32-bit machine which supports "long long" in
// software, and I would expect it to complain about a few of these
// assertions.
// What we *should* be doing is use:
// (sizeof(INDEX) == sizeof(long long) ? MPI_LONG_LONG : MPI_LONG)
// in the MPI arguments.
assert(sizeof(value.edge[0]) == sizeof(long));
MPI_Send(&value.edge,/* message buffer - data part */
sizeof(value.edge)/sizeof(value.edge[0]),/* number of data items */
MPI_LONG,/* index is a long long integer */
target_rank,/* destination process rank */
TAG_DATA,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
// Get acknowlegement that it has been written (to synchronise)
// We are being very careful here to avoid parallelism
// Next iteration once all the code is working reliably will be
// to fire & forget (ie remove the call below)
MPI_Recv(&value,/* message buffer - not used */
1,/* one data item */
MPI_LONG,/* response is a long int for now (temp) */
MPI_ANY_SOURCE,/* receive from any sender */
MPI_ANY_TAG,/* receive any type of message */
MPI_COMM_WORLD,/* always use this */
&status);/* info about received message */
}
// what getread calls internally to ask another node to supply us with a trie cell
static void remote_getread(int target_rank, INDEX index, CELL *valuep)
{
MPI_Status status;
long dummy;
// read data from the part of the virtual array held on another node
// The code is essentially serial - we are only using the other nodes
// as a cache for memory
// This is just a sketch from some example pages on the net - NOT the actual code we need:
MPI_Send(&dummy,/* message buffer - not actually used */
1,/* one data item */
MPI_LONG,/* data item is a long integer */
target_rank,/* destination process rank */
TAG_READ_READ,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
MPI_Send(&index,/* message buffer - address part */
1,/* one data item */
MPI_LONG_LONG,/* index is a long long integer */
target_rank,/* destination process rank */
TAG_DATA,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
assert(sizeof(valuep->edge[0]) == sizeof(long));
MPI_Recv(valuep,/* message buffer */
sizeof(valuep->edge)/sizeof(valuep->edge[0]),/* number of data items */
MPI_LONG,/* response is a long int for now (temp) */
MPI_ANY_SOURCE,/* receive from any sender */
MPI_ANY_TAG,/* receive any type of message */
MPI_COMM_WORLD,/* always use this */
&status);/* info about received message */
}
/*>*/
/*< (internal support for accessing virtual array) */
// setread and getread are unlikely to be used except possibly in debugging. They allow us
// to access individual elements of the virtual array regardless of which rank the data is
// stored on. Not to be used for high numbers of accesses as this is a relatively
// inefficient mechanism (compared to add_read which takes great advantage of locality)
// These do come in useful when prototyping a new procedure but once the logic has been
// worked out with an implementation in Node 0, it is almost always more efficient to
// use our 'migrating process' trick to run the logic local to whichever node hosts the
// relevant data.
static void setread(INDEX index, CELL value) // what the user calls
{
int target_rank;
target_rank = index >> CHUNKBITS;
if (target_rank == mpirank) {
int i;
for (i = 0; i < 5; i++) trie_cell[index & CHUNKMASK].edge[i] = value.edge[i];
} else {
if (target_rank < mpirank) {
fprintf(stderr, "PROGRAM BUG: Node %d requested access to trie_cell[%lld] on node %d - assert that we never feed backwards...\n", mpirank, index, target_rank);
shut_down_other_nodes();
MPI_Finalize();
exit(EXIT_FAILURE);
}
if (target_rank >= mpisize) {
fprintf(stderr, "ERROR: array bounds exceeded! Requested access to trie_cell[%lld]\n", index);
shut_down_other_nodes();
MPI_Finalize();
exit(EXIT_FAILURE);
}
remote_setread(target_rank, index, value);
}
}
static void getread(INDEX index, CELL *valuep) // what the user calls
{
int target_rank;
target_rank = index >> CHUNKBITS;
if (target_rank == mpirank) {
int i;
for (i = 0; i < 5; i++) valuep->edge[i] = trie_cell[index & CHUNKMASK].edge[i];
} else {
if (target_rank < mpirank) {
fprintf(stderr, "PROGRAM BUG: Node %d requested access to trie_cell[%lld] on node %d - assert that we never feed backwards...\n", mpirank, index, target_rank);
shut_down_other_nodes();
MPI_Finalize();
exit(EXIT_FAILURE);
}
if (target_rank >= mpisize) {
fprintf(stderr, "ERROR: array bounds exceeded! Requested access to trie_cell[%lld]\n", index);
shut_down_other_nodes();
MPI_Finalize();
exit(EXIT_FAILURE);
}
remote_getread(target_rank, index, valuep);
}
}
/*>*/
//------------------------------------------------------------------------
/*< local_*: these procedures all execute locally to the caller. This
is where the actual work is done. However the users do not call
these directly (except possibly in a few cases where some low-level
optimisation is a necessity). <b><tt>local_add_read</tt></b> is the
primary work-horse procedure that implement our algorithm. */
static int add_read(char *s, EDGE edge, long read_number, int len);
static INDEX get_next_free_edge(void);
// If we allow multi-threading, this procedure is where all the critical
// sections will be. Any updates to cells must be done carefully as
// protected read/modify/writes.
static int local_add_read(char *s, EDGE edge, long read_number, int len)
{
int c;
assert((edge >> CHUNKBITS) == mpirank);
assert((*s != '\0') && (*s != '\n') && (*s != '\r'));
//for (;;) { // on a single node, can optimise tail-recusion by making this a loop...
c = *s++;
letters++; // for diag only - NOT SHARED - WON'T WORK IN DISTRIBUTED IMPLEMENTATION!
freq[c]++; // letters and freq may need to be long longs
if (c == 'A') c = _A_;
else if (c == 'C') c = _C_;
else if (c == 'G') c = _G_;
else if (c == 'T') c = _T_;
else c = _N_; // some other char
if ((*s == '\0') || (*s == '\n') || (*s == '\r')) {
if (trie_cell[edge&CHUNKMASK].edge[c]&ENDS_WORD) {
long original_read = trie_cell[edge&CHUNKMASK].edge[c]&EDGE_MASK;
// Take note of 100% overlaps (dups) as we find them, and don't enter
// the second or later copies in the trie.
fprintf(duplicates, "%ld:0 %ld\n", original_read, read_number);
if (ferror(duplicates)) {
fprintf(stderr, "\n\n************* add_read() (duplicates) failed, %s\n", strerror(errno));
shut_down_other_nodes();
MPI_Finalize();
exit(EXIT_FAILURE);
}
dups++; // for diag only - NOT SHARED - WON'T WORK IN DISTRIBUTED IMPLEMENTATION!
} else {
assert((trie_cell[edge&CHUNKMASK].edge[c]&EDGE_MASK) == 0);
trie_cell[edge&CHUNKMASK].edge[c] = (ENDS_WORD | read_number);
}
return len+1;
}
// WE NO LONGER SUPPORT STRINGS OF DIFFERING LENGTHS
if (trie_cell[edge&CHUNKMASK].edge[c] == 0LL) { // we need to extend
INDEX new_edge = get_next_free_edge(); // Might return an edge from another compute node! // TO DO: return an *EMPTY* edge
setread(new_edge, empty); // slight overhead if local. optimise later.
trie_cell[edge&CHUNKMASK].edge[c] = new_edge;
if (new_edge >= MAX_SIZE) {
fprintf(stderr, "Ran out of free edges after %d reads (last_used_edge = %lld, MAX_SIZE = %lld)\n", seq, new_edge, MAX_SIZE);
shut_down_other_nodes();
MPI_Finalize();
exit(EXIT_FAILURE);
}
} else { // we need to merge
// ... with existing trie_cell[edge&CHUNKMASK].edge[c]
}
// This add_read call is actually the smart bit of the algorithm because it hands over to
// another node only when it has to. An alternative choice would have been to fetch/store
// each individual trie cell, but the overhead of that was horrendous.
// CALL NEXT PROCESSOR IF 'edge' IS TOO HIGH.
// (use of tmp is just so we can look at it with diags if need be)
return add_read(/* modified */ s, trie_cell[edge&CHUNKMASK].edge[c] /* may be on another node */, read_number, len+1);
//edge = trie_cell[edge].edge[c]&EDGE_MASK; // Local version can be optimised by removal of tail recursion...
//} // and converted into a loop.
}
/*>*/
/*< remote_*: these procedures call a later rank. These are user-application calls. */
static INDEX remote_get_next_free_edge(target_rank)
{
MPI_Status status;
long value = 0L; // generic up-front parameter
INDEX free_edge;
assert(target_rank != mpirank);
// COMMAND CODE
MPI_Send(&value,/* message buffer - data part and trigger to perform operation */
1,/* one data item */
MPI_LONG,/* data value is a long integer */
target_rank,/* destination process rank */
TAG_GET_NEXT_FREE_EDGE,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
// NO PARAMS
assert(sizeof(INDEX) == sizeof(long long));
MPI_Recv(&free_edge,/* message buffer - used for len function result */
1,/* one data item */
MPI_LONG_LONG,/* response is a long int for now (temp) */
MPI_ANY_SOURCE,/* receive from any sender */
MPI_ANY_TAG,/* receive any type of message */
MPI_COMM_WORLD,/* always use this */
&status);/* info about received message */
// Get acknowlegement that call is complete (to synchronise)
MPI_Recv(&value,/* message buffer - unused */
1,/* one data item */
MPI_LONG,/* response is a long int for now (temp) */
MPI_ANY_SOURCE,/* receive from any sender */
MPI_ANY_TAG,/* receive any type of message */
MPI_COMM_WORLD,/* always use this */
&status);/* info about received message */
return free_edge;
}
static int remote_add_read(long target_rank, char *s, long edge, long read_number, int len) // pass to another node
{
MPI_Status status;
long value = 0L; // generic up-front parameter
long stringlength;
assert(target_rank != mpirank);
stringlength = strlen(s)+1;
// COMMAND CODE
MPI_Send(&value,/* message buffer - data part and trigger to perform operation */
1,/* one data item */
MPI_LONG,/* data value is a long integer */
target_rank,/* destination process rank */
TAG_ADD_READ,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
// SEND PARAMETERS - later optimise by sending one param via 'value' above
MPI_Send(&stringlength,/* message buffer - address part */
1,/* one data item */
MPI_LONG,/* index is a long long integer */
target_rank,/* destination process rank */
TAG_DATA,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
send_bytes(target_rank, s, stringlength);
assert(sizeof(EDGE) == sizeof(long));
MPI_Send(&edge,
1,/* one data item */
MPI_LONG,/* index is a long long integer */
target_rank,/* destination process rank */
TAG_DATA,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
assert(sizeof(read_number) == sizeof(long));
MPI_Send(&read_number,
1,/* one data item */
MPI_LONG,/* index is a long long integer */
target_rank,/* destination process rank */
TAG_DATA,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
assert(sizeof(len) == sizeof(int));
MPI_Send(&len,
1,/* one data item */
MPI_INT,/* index is a long long integer */
target_rank,/* destination process rank */
TAG_DATA,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
// Get acknowlegement that it has been written (to synchronise)
MPI_Recv(&value,/* message buffer - used for len function result */
1,/* one data item */
MPI_LONG,/* response is a long int for now (temp) */
MPI_ANY_SOURCE,/* receive from any sender */
MPI_ANY_TAG,/* receive any type of message */
MPI_COMM_WORLD,/* always use this */
&status);/* info about received message */
// If len were not needed we could fire & forget, by removing the Recv above...
return (int)value;
}
static void remote_output_read(long target_rank, char *s, EDGE readindex) // pass to another node
{
MPI_Status status;
long value = 0L; // generic up-front parameter
long stringlength;
assert(target_rank != mpirank);
stringlength = strlen(s)+1;
// COMMAND CODE
MPI_Send(&value,/* message buffer - data part and trigger to perform operation */
1,/* one data item */
MPI_LONG,/* data value is a long integer */
target_rank,/* destination process rank */
TAG_OUTPUT_READ,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
// SEND PARAMETERS - later optimise by sending one param via 'value' above
MPI_Send(&stringlength,
1,/* one data item */
MPI_LONG,/* index is a long long integer */
target_rank,/* destination process rank */
TAG_DATA,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
send_bytes(target_rank, s, stringlength);
MPI_Send(&readindex,
1,/* one data item */
MPI_LONG_LONG,/* data value is a long integer */
target_rank,/* destination process rank */
TAG_DATA,/* user chosen message tag */
MPI_COMM_WORLD);/* always use this */
// Get acknowlegement that it has been written (to synchronise)
MPI_Recv(&value,/* message buffer - used for len function result */
1,/* one data item */
MPI_LONG,/* response is a long int for now (temp) */
MPI_ANY_SOURCE,/* receive from any sender */