source: trunk/user/sort/sort.c @ 635

Last change on this file since 635 was 635, checked in by alain, 16 months ago

This version is a major evolution: The physical memory allocators,
defined in the kmem.c, ppm.c, and kcm.c files have been modified
to support remote accesses. The RPCs that were previously user
to allocate physical memory in a remote cluster have been removed.
This has been done to cure a dead-lock in case of concurrent page-faults.

This version 2.2 has been tested on a (4 clusters / 2 cores per cluster)
TSAR architecture, for both the "sort" and the "fft" applications.

  • Property svn:executable set to *
File size: 17.4 KB
RevLine 
[635]1/*
2 * sort.c - Parallel sort
3 *
4 * Author     Cesar Fuguet Tortolero (2013)
5 *            Alain Greiner (2019)
6 *
7 * Copyright (c) UPMC Sorbonne Universites
8 *
9 * This is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; version 2.0 of the License.
12 *
13 * It is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16 * General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with ALMOS-MKH; if not, write to the Free Software Foundation,
20 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
[417]23///////////////////////////////////////////////////////////////////////////////
[635]24// This multi-threaded application implement a multi-stage sort.
25// It has been writen by Cesar Fuget Tortolero in 2013.
26// It has been ported on ALMOS-MKH by Alain Greiner in 2019.
27//
[417]28// There is one thread per physical cores.
29// Computation is organised as a binary tree:
30// - All threads execute in parallel a buble sort on a sub-array during the
31//   the first stage of parallel sort,
32// - The number of participating threads is divided by 2 at each next stage,
33//   to make a merge sort, on two subsets of previous stage.
34//
35//       Number_of_stages = number of barriers = log2(Number_of_threads)
36//
[635]37// The various stages are separated by synchronisation barriers, and the
38// main thread uses the join syscall to check that all threads completed
39// before printing the computation time (sequencial & parallel).
40// These results can be - optionnaly - registered in an instrumentation file.
41//
[417]42// Constraints :
43// - It supports up to 1024 cores: x_size, y_size, and ncores must be
44//   power of 2 (max 16*16 clusters / max 4 cores per cluster)
45// _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2
46//   larger than the number of cores.
47///////////////////////////////////////////////////////////////////////////////
48
49#include <stdio.h>
50#include <stdlib.h>
[596]51#include <unistd.h>
[417]52#include <pthread.h>
[445]53#include <almosmkh.h>
[459]54#include <hal_macros.h>
[417]55
[635]56#define ARRAY_LENGTH        128        // number of items
[588]57#define MAX_THREADS         1024       // 16 * 16 * 4
[440]58
[623]59#define USE_DQT_BARRIER     1          // use DQT barrier if non zero
60#define DISPLAY_ARRAY       0          // display items values before and after
[635]61#define DEBUG_MAIN          0          // trace main function
62#define DEBUG_SORT          0          // trace sort function
[623]63#define CHECK_RESULT        0          // for debug
64#define INSTRUMENTATION     1          // register computation times on file
65
[417]66/////////////////////////////////////////////////////////////
67// argument for the sort() function (one thread per core)
68/////////////////////////////////////////////////////////////
69
70typedef struct
71{
[619]72    unsigned int threads;       // total number of threads
[417]73    unsigned int thread_uid;    // thread user index (0 to threads -1)
74    unsigned int main_uid;      // main thread user index
75}
76args_t;
77
78//////////////////////////////////////////
79//      Global variables
80//////////////////////////////////////////
81
82int                 array0[ARRAY_LENGTH];    // values to sort
83int                 array1[ARRAY_LENGTH];   
84
85pthread_barrier_t   barrier;                 // synchronisation variables
86
[635]87pthread_t           trdid[MAX_THREADS];      // kernel identifiers
88pthread_attr_t      attr[MAX_THREADS];       // thread attributes
89args_t              arg[MAX_THREADS];        // sort function arguments
[417]90
91////////////////////////////////////
[588]92static void bubbleSort( int * array,
93                        unsigned int length,
94                        unsigned int init_pos )
[417]95{
[473]96    unsigned int i;
97    unsigned int j;
98    int          aux;
[417]99
100    for(i = 0; i < length; i++)
101    {
102        for(j = init_pos; j < (init_pos + length - i - 1); j++)
103        {
104            if(array[j] > array[j + 1])
105            {
106                aux          = array[j + 1];
107                array[j + 1] = array[j];
108                array[j]     = aux;
109            }
110        }
111    }
112}  // end bubbleSort()
113
114
[588]115///////////////////////////////////
[623]116static void merge( const int * src,               // source array
117                   int       * dst,               // destination array
118                   int         length,            // number of items in a subset
119                   int         init_pos_src_a,    // index first item in src subset A
120                   int         init_pos_src_b,    // index first item in src subset B
121                   int         init_pos_dst )     // index first item in destination
[417]122{
123    int i;
124    int j;
125    int k;
126
127    i = 0;
128    j = 0;
129    k = init_pos_dst;
130
131    while((i < length) || (j < length))
132    {
133        if((i < length) && (j < length))
134        {
135            if(src[init_pos_src_a + i] < src[init_pos_src_b + j])
136            {
137                dst[k++] = src[init_pos_src_a + i];
138                i++;
139            }
140            else
141            {
142                dst[k++] = src[init_pos_src_b + j];
143                j++;
144            }
145        }
146        else if(i < length)
147        {
148            dst[k++] = src[init_pos_src_a + i];
149            i++;
150        }
151        else
152        {
153            dst[k++] = src[init_pos_src_b + j];
154            j++;
155        }
156    }
157}  // end merge()
158
[588]159//////////////////////////////////////
[504]160static void sort( const args_t * ptr )
[417]161{
162    unsigned int       i;
163    unsigned long long cycle;
[440]164    unsigned int       cxy;
165    unsigned int       lid;
[417]166
[623]167    int              * src_array  = NULL;
168    int              * dst_array  = NULL;
[417]169
[440]170    // get core coordinates an date
171    get_core( &cxy , &lid );
172    get_cycle( &cycle );
173
[417]174    unsigned int  thread_uid = ptr->thread_uid;
175    unsigned int  threads    = ptr->threads;
176    unsigned int  main_uid   = ptr->main_uid;
177
[623]178#if DISPLAY_ARRAY
179unsigned int n;
180if( thread_uid == main_uid )
181{
182    printf("\n*** array before sort\n");
183    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
184}
185#endif
186
187    /////////////////////////////////
188    pthread_barrier_wait( &barrier ); 
189
[627]190#if DEBUG_SORT
191if( thread_uid == 0 )
[623]192printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
193#endif
194
[417]195    unsigned int  items      = ARRAY_LENGTH / threads;
196    unsigned int  stages     = __builtin_ctz( threads ) + 1;
197
[627]198#if DEBUG_SORT
199if( thread_uid == 0 )
[623]200printf("\n[sort] thread[%d] : start\n", thread_uid );
201#endif
[442]202
[417]203    bubbleSort( array0, items, items * thread_uid );
204
[627]205#if DEBUG_SORT
206if( thread_uid == 0 )
[623]207printf("\n[sort] thread[%d] : stage 0 completed\n", thread_uid );
208#endif
[417]209
210    /////////////////////////////////
211    pthread_barrier_wait( &barrier ); 
[440]212
[627]213#if DEBUG_SORT
214if( thread_uid == 0 )
[623]215printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
216#endif
217
218#if DISPLAY_ARRAY
219if( thread_uid == main_uid )
220{
221    printf("\n*** array after bubble sort\n");
222    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
223}
224#endif
225
226    // the number of threads contributing to sort is divided by 2
227    // and the number of items is multiplied by 2 at each next stage
[417]228    for ( i = 1 ; i < stages ; i++ )
229    {
[623]230        if((i % 2) == 1)               // odd stage
231        {
232            src_array = array0;
233            dst_array = array1;
234        }
235        else                           // even stage
236        {
237            src_array = array1;
238            dst_array = array0;
239        }
[417]240
241        if( (thread_uid & ((1<<i)-1)) == 0 )
242        {
243
[627]244#if DEBUG_SORT
245if( thread_uid == 0 )
[623]246printf("\n[sort] thread[%d] : stage %d start\n", thread_uid , i );
247#endif
[417]248            merge( src_array, 
249                   dst_array,
[623]250                   items << (i-1),
[417]251                   items * thread_uid,
252                   items * (thread_uid + (1 << (i-1))),
253                   items * thread_uid );
254
[627]255#if DEBUG_SORT
256if( thread_uid == 0 )
[623]257printf("\n[sort] thread[%d] : stage %d completed\n", thread_uid , i );
258#endif
[417]259        }
260
261        /////////////////////////////////
262        pthread_barrier_wait( &barrier );
263
[627]264#if DEBUG_SORT
265if( thread_uid == 0 )
[623]266printf("\n[sort] thread[%d] exit barrier %d\n", thread_uid , i );
267#endif
[417]268
[623]269#if DISPLAY_ARRAY
270if( thread_uid == main_uid )
271{
272    printf("\n*** array after merge %d\n", i );
273    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , dst_array[n] );
274}
275#endif
276
277    }  // en for stages
278
[417]279    // all threads but the main thread exit
280    if( thread_uid != main_uid ) pthread_exit( NULL );
281
282} // end sort()
283
284
[574]285/////////////////
[475]286void main( void )
[417]287{
[619]288    int                    error;
[417]289    unsigned int           x_size;             // number of rows
290    unsigned int           y_size;             // number of columns
291    unsigned int           ncores;             // number of cores per cluster
[619]292    unsigned int           total_threads;      // total number of threads
[417]293    unsigned int           thread_uid;         // user defined thread index
294    unsigned int           main_cxy;           // cluster identifier for main
295    unsigned int           main_x;             // X coordinate for main thread
296    unsigned int           main_y;             // Y coordinate for main thread
297    unsigned int           main_lid;           // core local index for main thread
298    unsigned int           main_uid;           // thread user index for main thread
299    unsigned int           x;                  // X coordinate for a thread
300    unsigned int           y;                  // Y coordinate for a thread
301    unsigned int           lid;                // core local index for a thread
302    unsigned int           n;                  // index in array to sort
303    pthread_barrierattr_t  barrier_attr;       // barrier attributes
304
[623]305    unsigned long long     start_cycle;
306    unsigned long long     seq_end_cycle;
307    unsigned long long     para_end_cycle;
308
309    /////////////////////////
310    get_cycle( &start_cycle );
311 
[596]312    // compute number of threads (one thread per core)
[417]313    get_config( &x_size , &y_size , &ncores );
[619]314    total_threads = x_size * y_size * ncores;
[417]315
316    // get core coordinates and user index for the main thread
317    get_core( &main_cxy , & main_lid );
[459]318    main_x   = HAL_X_FROM_CXY( main_cxy );
319    main_y   = HAL_Y_FROM_CXY( main_cxy );
[417]320    main_uid = (((main_x * y_size) + main_y) * ncores) + main_lid; 
321
322    // checks number of threads
[619]323    if ( (total_threads != 1)   && (total_threads != 2)   && (total_threads != 4)   && 
324         (total_threads != 8)   && (total_threads != 16 ) && (total_threads != 32)  && 
325         (total_threads != 64)  && (total_threads != 128) && (total_threads != 256) && 
326         (total_threads != 512) && (total_threads != 1024) )
[417]327    {
[623]328        printf("\n[sort error] number of cores must be power of 2\n");
[434]329        exit( 0 );
[417]330    }
331
332    // check array size
[619]333    if ( ARRAY_LENGTH % total_threads) 
[417]334    {
[623]335        printf("\n[sort error] array size must be multiple of number of threads\n");
[434]336        exit( 0 );
[417]337    }
338
[624]339    printf("\n[sort] main starts / %d threads / %d items / pid %x / cycle %d\n",
[623]340    total_threads, ARRAY_LENGTH, getpid(), (unsigned int)start_cycle );
[417]341
[619]342    // initialize barrier
343    if( USE_DQT_BARRIER )
[417]344    {
[619]345        barrier_attr.x_size   = x_size; 
346        barrier_attr.y_size   = y_size;
347        barrier_attr.nthreads = ncores;
348        error = pthread_barrier_init( &barrier, &barrier_attr , total_threads );
349    }
350    else // use SIMPLE_BARRIER
351    {
352        error = pthread_barrier_init( &barrier, NULL , total_threads );
353    }
354
355    if( error )
356    {
[623]357        printf("\n[sort error] cannot initialise barrier\n" );
[434]358        exit( 0 );
[417]359    }
360
[627]361#if DEBUG_MAIN
[628]362if( USE_DQT_BARRIER ) printf("\n[sort] main completes DQT barrier init\n");
363else                  printf("\n[sort] main completes simple barrier init\n");
[623]364#endif
[417]365
366    // Array to sort initialization
367    for ( n = 0 ; n < ARRAY_LENGTH ; n++ )
368    {
[596]369        array0[n] = ARRAY_LENGTH - n - 1;
[417]370    }
371
[627]372#if DEBUG_MAIN
[623]373printf("\n[sort] main completes array init\n");
[417]374#endif
375
376    // launch other threads to execute sort() function
377    // on cores other than the core running the main thread
[635]378    for ( x = 0 ; x < x_size ; x++ )
[417]379    {
[635]380        for ( y = 0 ; y < y_size ; y++ )
[417]381        {
[635]382            for ( lid = 0 ; lid < ncores ; lid++ )
[417]383            {
[635]384                // compute thread user index (continuous index)
[417]385                thread_uid = (((x * y_size) + y) * ncores) + lid;
386
[635]387                // set arguments for all threads
[619]388                arg[thread_uid].threads      = total_threads;
[417]389                arg[thread_uid].thread_uid   = thread_uid;
390                arg[thread_uid].main_uid     = main_uid;
391
392                // set thread attributes for all threads
[635]393                attr[thread_uid].attributes = PT_ATTR_CLUSTER_DEFINED | PT_ATTR_CORE_DEFINED;
[459]394                attr[thread_uid].cxy        = HAL_CXY_FROM_XY( x , y );
[417]395                attr[thread_uid].lid        = lid;
396
397                if( thread_uid != main_uid )
398                {
[635]399                    if ( pthread_create( &trdid[thread_uid],  // buffer for kernel identifier
[417]400                                         &attr[thread_uid],   // thread attributes
401                                         &sort,               // entry function
402                                         &arg[thread_uid] ) ) // sort arguments
403                    {
[623]404                        printf("\n[sort error] main cannot create thread %x \n", thread_uid );
[434]405                        exit( 0 );
[417]406                    }
[635]407
408#if (DEBUG_MAIN & 1)
[623]409printf("\n[sort] main created thread %x \n", thread_uid );
410#endif
[440]411                }
[417]412            }
413        }
414    }
[442]415   
[623]416    ///////////////////////////
417    get_cycle( &seq_end_cycle );
[417]418
[627]419#if DEBUG_MAIN
[623]420printf("\n[sort] main completes sequencial init at cycle %d\n",
421(unsigned int)seq_end_cycle );
422#endif
423
[445]424    // the main thread run also the sort() function
425    sort( &arg[main_uid] );
426
[635]427    // wait other threads completion
428    for ( x = 0 ; x < x_size ; x++ )
429    {
430        for ( y = 0 ; y < y_size ; y++ )
431        {
432            for ( lid = 0 ; lid < ncores ; lid++ )
433            {
434                // compute thread continuous index
435                thread_uid = (((x * y_size) + y) * ncores) + lid;
436
437                if( thread_uid != main_uid )
438                {
439                    if( pthread_join( trdid[thread_uid] , NULL ) )
440                    {
441                        printf("\n[fft error] in main thread %d joining thread %d\n",
442                        main_uid , thread_uid );
443                        exit( 0 );
444                    }
445                   
446#if (DEBUG_MAIN & 1)
447printf("\n[fft] main thread %d joined thread %d\n", main_uid, thread_uid );
448#endif
449
450                }
451            }
452        }
453    }
454
[623]455    ////////////////////////////
456    get_cycle( &para_end_cycle );
[619]457
[623]458    printf("\n[sort] main completes parallel sort at cycle %d\n", 
459    (unsigned int)para_end_cycle );
460
[619]461    // destroy barrier
462    pthread_barrier_destroy( &barrier );
463
[625]464#if CHECK_RESULT
465    int    success = 1;
466    int *  res_array = ( (total_threads ==   2) ||
467                         (total_threads ==   8) || 
468                         (total_threads ==  32) || 
469                         (total_threads == 128) || 
470                         (total_threads == 512) ) ? array1 : array0;
[623]471
[625]472    for( n=0 ; n<(ARRAY_LENGTH-2) ; n++ )
[417]473    {
[625]474        if ( res_array[n] > res_array[n+1] )
475        {
476            printf("\n[sort] array[%d] = %d > array[%d] = %d\n",
477            n , res_array[n] , n+1 , res_array[n+1] );
478            success = 0;
479            break;
480        }
[417]481    }
482
[625]483    if ( success ) printf("\n[sort] success\n");
484    else           printf("\n[sort] failure\n");
[417]485#endif
486
[623]487#if INSTRUMENTATION
[626]488    char               name[64];
489    char               path[128];
490    unsigned long long instru_cycle;
[417]491
[629]492    // build file name
493    if( USE_DQT_BARRIER )
494    snprintf( name , 64 , "sort_dqt_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
495    else
496    snprintf( name , 64 , "sort_smp_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
[623]497
[625]498    // build file pathname
499    snprintf( path , 128 , "home/%s" , name );
[623]500
[625]501    // compute results
502    unsigned int sequencial = (unsigned int)(seq_end_cycle - start_cycle);
503    unsigned int parallel   = (unsigned int)(para_end_cycle - seq_end_cycle);
[623]504
[625]505    // display results on process terminal
506    printf("\n----- %s -----\n"
507           " - sequencial : %d cycles\n"
508           " - parallel   : %d cycles\n", 
509           name, sequencial, parallel );
[623]510
[625]511    // open file
[626]512    get_cycle( &instru_cycle );
[625]513    FILE * stream = fopen( path , NULL );
[626]514
[625]515    if( stream == NULL )
516    {
[626]517        printf("\n[sort error] cannot open instrumentation file <%s>\n", path );
[625]518        exit(0);
519    }
[623]520
[626]521    printf("\n[sort] file <%s> open at cycle %d\n", path, (unsigned int)instru_cycle );
[625]522
[626]523#if IDBG
524idbg();
525#endif
526
[625]527    // register results to file
[626]528    get_cycle( &instru_cycle );
[625]529    int ret = fprintf( stream , "\n----- %s -----\n"
530                                " - sequencial : %d cycles\n"
531                                " - parallel   : %d cycles\n", name, sequencial, parallel );
532    if( ret < 0 )
533    {
[626]534        printf("\n[sort error] cannot write to instrumentation file <%s>\n", path );
[625]535        exit(0);
536    }
537
[626]538    printf("\n[sort] file <%s> written at cycle %d\n", path, (unsigned int)instru_cycle );
[625]539
[626]540#if IDBG
541idbg();
542#endif
543
[625]544    // close instrumentation file
[626]545    get_cycle( &instru_cycle );
546    ret = fclose( stream );
[625]547
[626]548    if( ret )
[625]549    {
[626]550        printf("\n[sort error] cannot close instrumentation file <%s>\n", path );
[625]551        exit(0);
552    }
553
[626]554    printf("\n[sort] file <%s> closed at cycle %d\n", path, (unsigned int)instru_cycle );
[625]555
[619]556#endif
557
558    exit( 0 );
559
[417]560}  // end main()
561
562/*
563vim: tabstop=4 : shiftwidth=4 : expandtab
564*/
Note: See TracBrowser for help on using the repository browser.