source: trunk/user/sort/sort.c @ 635

Last change on this file since 635 was 635, checked in by alain, 5 months ago

This version is a major evolution: The physical memory allocators,
defined in the kmem.c, ppm.c, and kcm.c files have been modified
to support remote accesses. The RPCs that were previously user
to allocate physical memory in a remote cluster have been removed.
This has been done to cure a dead-lock in case of concurrent page-faults.

This version 2.2 has been tested on a (4 clusters / 2 cores per cluster)
TSAR architecture, for both the "sort" and the "fft" applications.

  • Property svn:executable set to *
File size: 17.4 KB
Line 
1/*
2 * sort.c - Parallel sort
3 *
4 * Author     Cesar Fuguet Tortolero (2013)
5 *            Alain Greiner (2019)
6 *
7 * Copyright (c) UPMC Sorbonne Universites
8 *
9 * This is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; version 2.0 of the License.
12 *
13 * It is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16 * General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with ALMOS-MKH; if not, write to the Free Software Foundation,
20 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
23///////////////////////////////////////////////////////////////////////////////
24// This multi-threaded application implement a multi-stage sort.
25// It has been writen by Cesar Fuget Tortolero in 2013.
26// It has been ported on ALMOS-MKH by Alain Greiner in 2019.
27//
28// There is one thread per physical cores.
29// Computation is organised as a binary tree:
30// - All threads execute in parallel a buble sort on a sub-array during the
31//   the first stage of parallel sort,
32// - The number of participating threads is divided by 2 at each next stage,
33//   to make a merge sort, on two subsets of previous stage.
34//
35//       Number_of_stages = number of barriers = log2(Number_of_threads)
36//
37// The various stages are separated by synchronisation barriers, and the
38// main thread uses the join syscall to check that all threads completed
39// before printing the computation time (sequencial & parallel).
40// These results can be - optionnaly - registered in an instrumentation file.
41//
42// Constraints :
43// - It supports up to 1024 cores: x_size, y_size, and ncores must be
44//   power of 2 (max 16*16 clusters / max 4 cores per cluster)
45// _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2
46//   larger than the number of cores.
47///////////////////////////////////////////////////////////////////////////////
48
49#include <stdio.h>
50#include <stdlib.h>
51#include <unistd.h>
52#include <pthread.h>
53#include <almosmkh.h>
54#include <hal_macros.h>
55
56#define ARRAY_LENGTH        128        // number of items
57#define MAX_THREADS         1024       // 16 * 16 * 4
58
59#define USE_DQT_BARRIER     1          // use DQT barrier if non zero
60#define DISPLAY_ARRAY       0          // display items values before and after
61#define DEBUG_MAIN          0          // trace main function
62#define DEBUG_SORT          0          // trace sort function
63#define CHECK_RESULT        0          // for debug
64#define INSTRUMENTATION     1          // register computation times on file
65
66/////////////////////////////////////////////////////////////
67// argument for the sort() function (one thread per core)
68/////////////////////////////////////////////////////////////
69
70typedef struct
71{
72    unsigned int threads;       // total number of threads
73    unsigned int thread_uid;    // thread user index (0 to threads -1)
74    unsigned int main_uid;      // main thread user index
75}
76args_t;
77
78//////////////////////////////////////////
79//      Global variables
80//////////////////////////////////////////
81
82int                 array0[ARRAY_LENGTH];    // values to sort
83int                 array1[ARRAY_LENGTH];   
84
85pthread_barrier_t   barrier;                 // synchronisation variables
86
87pthread_t           trdid[MAX_THREADS];      // kernel identifiers
88pthread_attr_t      attr[MAX_THREADS];       // thread attributes
89args_t              arg[MAX_THREADS];        // sort function arguments
90
91////////////////////////////////////
92static void bubbleSort( int * array,
93                        unsigned int length,
94                        unsigned int init_pos )
95{
96    unsigned int i;
97    unsigned int j;
98    int          aux;
99
100    for(i = 0; i < length; i++)
101    {
102        for(j = init_pos; j < (init_pos + length - i - 1); j++)
103        {
104            if(array[j] > array[j + 1])
105            {
106                aux          = array[j + 1];
107                array[j + 1] = array[j];
108                array[j]     = aux;
109            }
110        }
111    }
112}  // end bubbleSort()
113
114
115///////////////////////////////////
116static void merge( const int * src,               // source array
117                   int       * dst,               // destination array
118                   int         length,            // number of items in a subset
119                   int         init_pos_src_a,    // index first item in src subset A
120                   int         init_pos_src_b,    // index first item in src subset B
121                   int         init_pos_dst )     // index first item in destination
122{
123    int i;
124    int j;
125    int k;
126
127    i = 0;
128    j = 0;
129    k = init_pos_dst;
130
131    while((i < length) || (j < length))
132    {
133        if((i < length) && (j < length))
134        {
135            if(src[init_pos_src_a + i] < src[init_pos_src_b + j])
136            {
137                dst[k++] = src[init_pos_src_a + i];
138                i++;
139            }
140            else
141            {
142                dst[k++] = src[init_pos_src_b + j];
143                j++;
144            }
145        }
146        else if(i < length)
147        {
148            dst[k++] = src[init_pos_src_a + i];
149            i++;
150        }
151        else
152        {
153            dst[k++] = src[init_pos_src_b + j];
154            j++;
155        }
156    }
157}  // end merge()
158
159//////////////////////////////////////
160static void sort( const args_t * ptr )
161{
162    unsigned int       i;
163    unsigned long long cycle;
164    unsigned int       cxy;
165    unsigned int       lid;
166
167    int              * src_array  = NULL;
168    int              * dst_array  = NULL;
169
170    // get core coordinates an date
171    get_core( &cxy , &lid );
172    get_cycle( &cycle );
173
174    unsigned int  thread_uid = ptr->thread_uid;
175    unsigned int  threads    = ptr->threads;
176    unsigned int  main_uid   = ptr->main_uid;
177
178#if DISPLAY_ARRAY
179unsigned int n;
180if( thread_uid == main_uid )
181{
182    printf("\n*** array before sort\n");
183    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
184}
185#endif
186
187    /////////////////////////////////
188    pthread_barrier_wait( &barrier ); 
189
190#if DEBUG_SORT
191if( thread_uid == 0 )
192printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
193#endif
194
195    unsigned int  items      = ARRAY_LENGTH / threads;
196    unsigned int  stages     = __builtin_ctz( threads ) + 1;
197
198#if DEBUG_SORT
199if( thread_uid == 0 )
200printf("\n[sort] thread[%d] : start\n", thread_uid );
201#endif
202
203    bubbleSort( array0, items, items * thread_uid );
204
205#if DEBUG_SORT
206if( thread_uid == 0 )
207printf("\n[sort] thread[%d] : stage 0 completed\n", thread_uid );
208#endif
209
210    /////////////////////////////////
211    pthread_barrier_wait( &barrier ); 
212
213#if DEBUG_SORT
214if( thread_uid == 0 )
215printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
216#endif
217
218#if DISPLAY_ARRAY
219if( thread_uid == main_uid )
220{
221    printf("\n*** array after bubble sort\n");
222    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
223}
224#endif
225
226    // the number of threads contributing to sort is divided by 2
227    // and the number of items is multiplied by 2 at each next stage
228    for ( i = 1 ; i < stages ; i++ )
229    {
230        if((i % 2) == 1)               // odd stage
231        {
232            src_array = array0;
233            dst_array = array1;
234        }
235        else                           // even stage
236        {
237            src_array = array1;
238            dst_array = array0;
239        }
240
241        if( (thread_uid & ((1<<i)-1)) == 0 )
242        {
243
244#if DEBUG_SORT
245if( thread_uid == 0 )
246printf("\n[sort] thread[%d] : stage %d start\n", thread_uid , i );
247#endif
248            merge( src_array, 
249                   dst_array,
250                   items << (i-1),
251                   items * thread_uid,
252                   items * (thread_uid + (1 << (i-1))),
253                   items * thread_uid );
254
255#if DEBUG_SORT
256if( thread_uid == 0 )
257printf("\n[sort] thread[%d] : stage %d completed\n", thread_uid , i );
258#endif
259        }
260
261        /////////////////////////////////
262        pthread_barrier_wait( &barrier );
263
264#if DEBUG_SORT
265if( thread_uid == 0 )
266printf("\n[sort] thread[%d] exit barrier %d\n", thread_uid , i );
267#endif
268
269#if DISPLAY_ARRAY
270if( thread_uid == main_uid )
271{
272    printf("\n*** array after merge %d\n", i );
273    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , dst_array[n] );
274}
275#endif
276
277    }  // en for stages
278
279    // all threads but the main thread exit
280    if( thread_uid != main_uid ) pthread_exit( NULL );
281
282} // end sort()
283
284
285/////////////////
286void main( void )
287{
288    int                    error;
289    unsigned int           x_size;             // number of rows
290    unsigned int           y_size;             // number of columns
291    unsigned int           ncores;             // number of cores per cluster
292    unsigned int           total_threads;      // total number of threads
293    unsigned int           thread_uid;         // user defined thread index
294    unsigned int           main_cxy;           // cluster identifier for main
295    unsigned int           main_x;             // X coordinate for main thread
296    unsigned int           main_y;             // Y coordinate for main thread
297    unsigned int           main_lid;           // core local index for main thread
298    unsigned int           main_uid;           // thread user index for main thread
299    unsigned int           x;                  // X coordinate for a thread
300    unsigned int           y;                  // Y coordinate for a thread
301    unsigned int           lid;                // core local index for a thread
302    unsigned int           n;                  // index in array to sort
303    pthread_barrierattr_t  barrier_attr;       // barrier attributes
304
305    unsigned long long     start_cycle;
306    unsigned long long     seq_end_cycle;
307    unsigned long long     para_end_cycle;
308
309    /////////////////////////
310    get_cycle( &start_cycle );
311 
312    // compute number of threads (one thread per core)
313    get_config( &x_size , &y_size , &ncores );
314    total_threads = x_size * y_size * ncores;
315
316    // get core coordinates and user index for the main thread
317    get_core( &main_cxy , & main_lid );
318    main_x   = HAL_X_FROM_CXY( main_cxy );
319    main_y   = HAL_Y_FROM_CXY( main_cxy );
320    main_uid = (((main_x * y_size) + main_y) * ncores) + main_lid; 
321
322    // checks number of threads
323    if ( (total_threads != 1)   && (total_threads != 2)   && (total_threads != 4)   && 
324         (total_threads != 8)   && (total_threads != 16 ) && (total_threads != 32)  && 
325         (total_threads != 64)  && (total_threads != 128) && (total_threads != 256) && 
326         (total_threads != 512) && (total_threads != 1024) )
327    {
328        printf("\n[sort error] number of cores must be power of 2\n");
329        exit( 0 );
330    }
331
332    // check array size
333    if ( ARRAY_LENGTH % total_threads) 
334    {
335        printf("\n[sort error] array size must be multiple of number of threads\n");
336        exit( 0 );
337    }
338
339    printf("\n[sort] main starts / %d threads / %d items / pid %x / cycle %d\n",
340    total_threads, ARRAY_LENGTH, getpid(), (unsigned int)start_cycle );
341
342    // initialize barrier
343    if( USE_DQT_BARRIER )
344    {
345        barrier_attr.x_size   = x_size; 
346        barrier_attr.y_size   = y_size;
347        barrier_attr.nthreads = ncores;
348        error = pthread_barrier_init( &barrier, &barrier_attr , total_threads );
349    }
350    else // use SIMPLE_BARRIER
351    {
352        error = pthread_barrier_init( &barrier, NULL , total_threads );
353    }
354
355    if( error )
356    {
357        printf("\n[sort error] cannot initialise barrier\n" );
358        exit( 0 );
359    }
360
361#if DEBUG_MAIN
362if( USE_DQT_BARRIER ) printf("\n[sort] main completes DQT barrier init\n");
363else                  printf("\n[sort] main completes simple barrier init\n");
364#endif
365
366    // Array to sort initialization
367    for ( n = 0 ; n < ARRAY_LENGTH ; n++ )
368    {
369        array0[n] = ARRAY_LENGTH - n - 1;
370    }
371
372#if DEBUG_MAIN
373printf("\n[sort] main completes array init\n");
374#endif
375
376    // launch other threads to execute sort() function
377    // on cores other than the core running the main thread
378    for ( x = 0 ; x < x_size ; x++ )
379    {
380        for ( y = 0 ; y < y_size ; y++ )
381        {
382            for ( lid = 0 ; lid < ncores ; lid++ )
383            {
384                // compute thread user index (continuous index)
385                thread_uid = (((x * y_size) + y) * ncores) + lid;
386
387                // set arguments for all threads
388                arg[thread_uid].threads      = total_threads;
389                arg[thread_uid].thread_uid   = thread_uid;
390                arg[thread_uid].main_uid     = main_uid;
391
392                // set thread attributes for all threads
393                attr[thread_uid].attributes = PT_ATTR_CLUSTER_DEFINED | PT_ATTR_CORE_DEFINED;
394                attr[thread_uid].cxy        = HAL_CXY_FROM_XY( x , y );
395                attr[thread_uid].lid        = lid;
396
397                if( thread_uid != main_uid )
398                {
399                    if ( pthread_create( &trdid[thread_uid],  // buffer for kernel identifier
400                                         &attr[thread_uid],   // thread attributes
401                                         &sort,               // entry function
402                                         &arg[thread_uid] ) ) // sort arguments
403                    {
404                        printf("\n[sort error] main cannot create thread %x \n", thread_uid );
405                        exit( 0 );
406                    }
407
408#if (DEBUG_MAIN & 1)
409printf("\n[sort] main created thread %x \n", thread_uid );
410#endif
411                }
412            }
413        }
414    }
415   
416    ///////////////////////////
417    get_cycle( &seq_end_cycle );
418
419#if DEBUG_MAIN
420printf("\n[sort] main completes sequencial init at cycle %d\n",
421(unsigned int)seq_end_cycle );
422#endif
423
424    // the main thread run also the sort() function
425    sort( &arg[main_uid] );
426
427    // wait other threads completion
428    for ( x = 0 ; x < x_size ; x++ )
429    {
430        for ( y = 0 ; y < y_size ; y++ )
431        {
432            for ( lid = 0 ; lid < ncores ; lid++ )
433            {
434                // compute thread continuous index
435                thread_uid = (((x * y_size) + y) * ncores) + lid;
436
437                if( thread_uid != main_uid )
438                {
439                    if( pthread_join( trdid[thread_uid] , NULL ) )
440                    {
441                        printf("\n[fft error] in main thread %d joining thread %d\n",
442                        main_uid , thread_uid );
443                        exit( 0 );
444                    }
445                   
446#if (DEBUG_MAIN & 1)
447printf("\n[fft] main thread %d joined thread %d\n", main_uid, thread_uid );
448#endif
449
450                }
451            }
452        }
453    }
454
455    ////////////////////////////
456    get_cycle( &para_end_cycle );
457
458    printf("\n[sort] main completes parallel sort at cycle %d\n", 
459    (unsigned int)para_end_cycle );
460
461    // destroy barrier
462    pthread_barrier_destroy( &barrier );
463
464#if CHECK_RESULT
465    int    success = 1;
466    int *  res_array = ( (total_threads ==   2) ||
467                         (total_threads ==   8) || 
468                         (total_threads ==  32) || 
469                         (total_threads == 128) || 
470                         (total_threads == 512) ) ? array1 : array0;
471
472    for( n=0 ; n<(ARRAY_LENGTH-2) ; n++ )
473    {
474        if ( res_array[n] > res_array[n+1] )
475        {
476            printf("\n[sort] array[%d] = %d > array[%d] = %d\n",
477            n , res_array[n] , n+1 , res_array[n+1] );
478            success = 0;
479            break;
480        }
481    }
482
483    if ( success ) printf("\n[sort] success\n");
484    else           printf("\n[sort] failure\n");
485#endif
486
487#if INSTRUMENTATION
488    char               name[64];
489    char               path[128];
490    unsigned long long instru_cycle;
491
492    // build file name
493    if( USE_DQT_BARRIER )
494    snprintf( name , 64 , "sort_dqt_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
495    else
496    snprintf( name , 64 , "sort_smp_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
497
498    // build file pathname
499    snprintf( path , 128 , "home/%s" , name );
500
501    // compute results
502    unsigned int sequencial = (unsigned int)(seq_end_cycle - start_cycle);
503    unsigned int parallel   = (unsigned int)(para_end_cycle - seq_end_cycle);
504
505    // display results on process terminal
506    printf("\n----- %s -----\n"
507           " - sequencial : %d cycles\n"
508           " - parallel   : %d cycles\n", 
509           name, sequencial, parallel );
510
511    // open file
512    get_cycle( &instru_cycle );
513    FILE * stream = fopen( path , NULL );
514
515    if( stream == NULL )
516    {
517        printf("\n[sort error] cannot open instrumentation file <%s>\n", path );
518        exit(0);
519    }
520
521    printf("\n[sort] file <%s> open at cycle %d\n", path, (unsigned int)instru_cycle );
522
523#if IDBG
524idbg();
525#endif
526
527    // register results to file
528    get_cycle( &instru_cycle );
529    int ret = fprintf( stream , "\n----- %s -----\n"
530                                " - sequencial : %d cycles\n"
531                                " - parallel   : %d cycles\n", name, sequencial, parallel );
532    if( ret < 0 )
533    {
534        printf("\n[sort error] cannot write to instrumentation file <%s>\n", path );
535        exit(0);
536    }
537
538    printf("\n[sort] file <%s> written at cycle %d\n", path, (unsigned int)instru_cycle );
539
540#if IDBG
541idbg();
542#endif
543
544    // close instrumentation file
545    get_cycle( &instru_cycle );
546    ret = fclose( stream );
547
548    if( ret )
549    {
550        printf("\n[sort error] cannot close instrumentation file <%s>\n", path );
551        exit(0);
552    }
553
554    printf("\n[sort] file <%s> closed at cycle %d\n", path, (unsigned int)instru_cycle );
555
556#endif
557
558    exit( 0 );
559
560}  // end main()
561
562/*
563vim: tabstop=4 : shiftwidth=4 : expandtab
564*/
Note: See TracBrowser for help on using the repository browser.