source: trunk/user/sort/sort.c @ 625

Last change on this file since 625 was 625, checked in by alain, 5 years ago

Fix a bug in the vmm_remove_vseg() function: the physical pages
associated to an user DATA vseg were released to the kernel when
the target process descriptor was in the reference cluster.
This physical pages release should be done only when the page
forks counter value is zero.
All other modifications are cosmetic.

  • Property svn:executable set to *
File size: 15.4 KB
RevLine 
[417]1///////////////////////////////////////////////////////////////////////////////
2// File   :  sort.c
3// Date   :  November 2013
4// Author :  Cesar Fuguet Tortolero <cesar.fuguet-tortolero@lip6.fr>
5///////////////////////////////////////////////////////////////////////////////
6// This multi-threaded application implement a multi-stage sort application.
7// The various stages are separated by synchronisation barriers.
8// There is one thread per physical cores.
9// Computation is organised as a binary tree:
10// - All threads execute in parallel a buble sort on a sub-array during the
11//   the first stage of parallel sort,
12// - The number of participating threads is divided by 2 at each next stage,
13//   to make a merge sort, on two subsets of previous stage.
14//
15//       Number_of_stages = number of barriers = log2(Number_of_threads)
16//
17// Constraints :
18// - It supports up to 1024 cores: x_size, y_size, and ncores must be
19//   power of 2 (max 16*16 clusters / max 4 cores per cluster)
20// _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2
21//   larger than the number of cores.
22///////////////////////////////////////////////////////////////////////////////
23
24#include <stdio.h>
25#include <stdlib.h>
[596]26#include <unistd.h>
[417]27#include <pthread.h>
[445]28#include <almosmkh.h>
[459]29#include <hal_macros.h>
[417]30
[625]31#define ARRAY_LENGTH        256        // number of items
[588]32#define MAX_THREADS         1024       // 16 * 16 * 4
[440]33
[623]34#define USE_DQT_BARRIER     1          // use DQT barrier if non zero
35#define DISPLAY_ARRAY       0          // display items values before and after
36#define VERBOSE             0          // for debug
37#define INTERACTIVE_MODE    0          // for debug
38#define CHECK_RESULT        0          // for debug
39#define INSTRUMENTATION     1          // register computation times on file
40
[417]41/////////////////////////////////////////////////////////////
42// argument for the sort() function (one thread per core)
43/////////////////////////////////////////////////////////////
44
45typedef struct
46{
[619]47    unsigned int threads;       // total number of threads
[417]48    unsigned int thread_uid;    // thread user index (0 to threads -1)
49    unsigned int main_uid;      // main thread user index
50}
51args_t;
52
53//////////////////////////////////////////
54//      Global variables
55//////////////////////////////////////////
56
57int                 array0[ARRAY_LENGTH];    // values to sort
58int                 array1[ARRAY_LENGTH];   
59
60pthread_barrier_t   barrier;                 // synchronisation variables
61
[440]62pthread_attr_t      attr[MAX_THREADS];       // thread attributes (one per thread)
63args_t              arg[MAX_THREADS];        // sort function arguments (one per thread)
[417]64
65////////////////////////////////////
[588]66static void bubbleSort( int * array,
67                        unsigned int length,
68                        unsigned int init_pos )
[417]69{
[473]70    unsigned int i;
71    unsigned int j;
72    int          aux;
[417]73
74    for(i = 0; i < length; i++)
75    {
76        for(j = init_pos; j < (init_pos + length - i - 1); j++)
77        {
78            if(array[j] > array[j + 1])
79            {
80                aux          = array[j + 1];
81                array[j + 1] = array[j];
82                array[j]     = aux;
83            }
84        }
85    }
86}  // end bubbleSort()
87
88
[588]89///////////////////////////////////
[623]90static void merge( const int * src,               // source array
91                   int       * dst,               // destination array
92                   int         length,            // number of items in a subset
93                   int         init_pos_src_a,    // index first item in src subset A
94                   int         init_pos_src_b,    // index first item in src subset B
95                   int         init_pos_dst )     // index first item in destination
[417]96{
97    int i;
98    int j;
99    int k;
100
101    i = 0;
102    j = 0;
103    k = init_pos_dst;
104
105    while((i < length) || (j < length))
106    {
107        if((i < length) && (j < length))
108        {
109            if(src[init_pos_src_a + i] < src[init_pos_src_b + j])
110            {
111                dst[k++] = src[init_pos_src_a + i];
112                i++;
113            }
114            else
115            {
116                dst[k++] = src[init_pos_src_b + j];
117                j++;
118            }
119        }
120        else if(i < length)
121        {
122            dst[k++] = src[init_pos_src_a + i];
123            i++;
124        }
125        else
126        {
127            dst[k++] = src[init_pos_src_b + j];
128            j++;
129        }
130    }
131}  // end merge()
132
[588]133//////////////////////////////////////
[504]134static void sort( const args_t * ptr )
[417]135{
136    unsigned int       i;
137    unsigned long long cycle;
[440]138    unsigned int       cxy;
139    unsigned int       lid;
[417]140
[623]141    int              * src_array  = NULL;
142    int              * dst_array  = NULL;
[417]143
[440]144    // get core coordinates an date
145    get_core( &cxy , &lid );
146    get_cycle( &cycle );
147
[417]148    unsigned int  thread_uid = ptr->thread_uid;
149    unsigned int  threads    = ptr->threads;
150    unsigned int  main_uid   = ptr->main_uid;
151
[623]152#if DISPLAY_ARRAY
153unsigned int n;
154if( thread_uid == main_uid )
155{
156    printf("\n*** array before sort\n");
157    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
158}
159#endif
160
161    /////////////////////////////////
162    pthread_barrier_wait( &barrier ); 
163
164#if VERBOSE
165printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
166#endif
167
[417]168    unsigned int  items      = ARRAY_LENGTH / threads;
169    unsigned int  stages     = __builtin_ctz( threads ) + 1;
170
[623]171#if VERBOSE
172printf("\n[sort] thread[%d] : start\n", thread_uid );
173#endif
[442]174
[417]175    bubbleSort( array0, items, items * thread_uid );
176
[623]177#if VERBOSE
178printf("\n[sort] thread[%d] : stage 0 completed\n", thread_uid );
179#endif
[417]180
181    /////////////////////////////////
182    pthread_barrier_wait( &barrier ); 
[440]183
[623]184#if VERBOSE
185printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
186#endif
187
188#if DISPLAY_ARRAY
189if( thread_uid == main_uid )
190{
191    printf("\n*** array after bubble sort\n");
192    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
193}
194#endif
195
196    // the number of threads contributing to sort is divided by 2
197    // and the number of items is multiplied by 2 at each next stage
[417]198    for ( i = 1 ; i < stages ; i++ )
199    {
[623]200        if((i % 2) == 1)               // odd stage
201        {
202            src_array = array0;
203            dst_array = array1;
204        }
205        else                           // even stage
206        {
207            src_array = array1;
208            dst_array = array0;
209        }
[417]210
211        if( (thread_uid & ((1<<i)-1)) == 0 )
212        {
213
[623]214#if VERBOSE
215printf("\n[sort] thread[%d] : stage %d start\n", thread_uid , i );
216#endif
[417]217            merge( src_array, 
218                   dst_array,
[623]219                   items << (i-1),
[417]220                   items * thread_uid,
221                   items * (thread_uid + (1 << (i-1))),
222                   items * thread_uid );
223
[623]224#if VERBOSE
225printf("\n[sort] thread[%d] : stage %d completed\n", thread_uid , i );
226#endif
[417]227        }
228
229        /////////////////////////////////
230        pthread_barrier_wait( &barrier );
231
[623]232#if VERBOSE
233printf("\n[sort] thread[%d] exit barrier %d\n", thread_uid , i );
234#endif
[417]235
[623]236#if DISPLAY_ARRAY
237if( thread_uid == main_uid )
238{
239    printf("\n*** array after merge %d\n", i );
240    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , dst_array[n] );
241}
242#endif
243
244    }  // en for stages
245
[417]246    // all threads but the main thread exit
247    if( thread_uid != main_uid ) pthread_exit( NULL );
248
249} // end sort()
250
251
[574]252/////////////////
[475]253void main( void )
[417]254{
[619]255    int                    error;
[417]256    unsigned int           x_size;             // number of rows
257    unsigned int           y_size;             // number of columns
258    unsigned int           ncores;             // number of cores per cluster
[619]259    unsigned int           total_threads;      // total number of threads
[417]260    unsigned int           thread_uid;         // user defined thread index
261    unsigned int           main_cxy;           // cluster identifier for main
262    unsigned int           main_x;             // X coordinate for main thread
263    unsigned int           main_y;             // Y coordinate for main thread
264    unsigned int           main_lid;           // core local index for main thread
265    unsigned int           main_uid;           // thread user index for main thread
266    unsigned int           x;                  // X coordinate for a thread
267    unsigned int           y;                  // Y coordinate for a thread
268    unsigned int           lid;                // core local index for a thread
269    unsigned int           n;                  // index in array to sort
270    pthread_t              trdid;              // kernel allocated thread index (unused)
271    pthread_barrierattr_t  barrier_attr;       // barrier attributes
272
[623]273    unsigned long long     start_cycle;
274    unsigned long long     seq_end_cycle;
275    unsigned long long     para_end_cycle;
276
277    /////////////////////////
278    get_cycle( &start_cycle );
279 
[596]280    // compute number of threads (one thread per core)
[417]281    get_config( &x_size , &y_size , &ncores );
[619]282    total_threads = x_size * y_size * ncores;
[417]283
284    // get core coordinates and user index for the main thread
285    get_core( &main_cxy , & main_lid );
[459]286    main_x   = HAL_X_FROM_CXY( main_cxy );
287    main_y   = HAL_Y_FROM_CXY( main_cxy );
[417]288    main_uid = (((main_x * y_size) + main_y) * ncores) + main_lid; 
289
290    // checks number of threads
[619]291    if ( (total_threads != 1)   && (total_threads != 2)   && (total_threads != 4)   && 
292         (total_threads != 8)   && (total_threads != 16 ) && (total_threads != 32)  && 
293         (total_threads != 64)  && (total_threads != 128) && (total_threads != 256) && 
294         (total_threads != 512) && (total_threads != 1024) )
[417]295    {
[623]296        printf("\n[sort error] number of cores must be power of 2\n");
[434]297        exit( 0 );
[417]298    }
299
300    // check array size
[619]301    if ( ARRAY_LENGTH % total_threads) 
[417]302    {
[623]303        printf("\n[sort error] array size must be multiple of number of threads\n");
[434]304        exit( 0 );
[417]305    }
306
[624]307    printf("\n[sort] main starts / %d threads / %d items / pid %x / cycle %d\n",
[623]308    total_threads, ARRAY_LENGTH, getpid(), (unsigned int)start_cycle );
[417]309
[619]310    // initialize barrier
311    if( USE_DQT_BARRIER )
[417]312    {
[619]313        barrier_attr.x_size   = x_size; 
314        barrier_attr.y_size   = y_size;
315        barrier_attr.nthreads = ncores;
316        error = pthread_barrier_init( &barrier, &barrier_attr , total_threads );
317    }
318    else // use SIMPLE_BARRIER
319    {
320        error = pthread_barrier_init( &barrier, NULL , total_threads );
321    }
322
323    if( error )
324    {
[623]325        printf("\n[sort error] cannot initialise barrier\n" );
[434]326        exit( 0 );
[417]327    }
328
[623]329#if VERBOSE
330printf("\n[sort] main completes barrier init\n");
331#endif
[417]332
333    // Array to sort initialization
334    for ( n = 0 ; n < ARRAY_LENGTH ; n++ )
335    {
[596]336        array0[n] = ARRAY_LENGTH - n - 1;
[417]337    }
338
[623]339#if VERBOSE
340printf("\n[sort] main completes array init\n");
[417]341#endif
342
343    // launch other threads to execute sort() function
344    // on cores other than the core running the main thread
345    for ( x=0 ; x<x_size ; x++ )
346    {
347        for ( y=0 ; y<y_size ; y++ )
348        {
349            for ( lid=0 ; lid<ncores ; lid++ )
350            {
351                thread_uid = (((x * y_size) + y) * ncores) + lid;
352
353                // set sort arguments for all threads
[619]354                arg[thread_uid].threads      = total_threads;
[417]355                arg[thread_uid].thread_uid   = thread_uid;
356                arg[thread_uid].main_uid     = main_uid;
357
358                // set thread attributes for all threads
[574]359                attr[thread_uid].attributes = PT_ATTR_DETACH          |
360                                              PT_ATTR_CLUSTER_DEFINED | 
361                                              PT_ATTR_CORE_DEFINED;
[459]362                attr[thread_uid].cxy        = HAL_CXY_FROM_XY( x , y );
[417]363                attr[thread_uid].lid        = lid;
364
365                if( thread_uid != main_uid )
366                {
367                    if ( pthread_create( &trdid,              // not used because no join
368                                         &attr[thread_uid],   // thread attributes
369                                         &sort,               // entry function
370                                         &arg[thread_uid] ) ) // sort arguments
371                    {
[623]372                        printf("\n[sort error] main cannot create thread %x \n", thread_uid );
[434]373                        exit( 0 );
[417]374                    }
[442]375                    else
376                    {
[623]377#if VERBOSE
378printf("\n[sort] main created thread %x \n", thread_uid );
379#endif
[442]380                    }
[440]381                }
[417]382            }
383        }
384    }
[442]385   
[623]386    ///////////////////////////
387    get_cycle( &seq_end_cycle );
[417]388
[623]389#if VERBOSE
390printf("\n[sort] main completes sequencial init at cycle %d\n",
391(unsigned int)seq_end_cycle );
392#endif
393
[442]394#if INTERACTIVE_MODE
395idbg();
396#endif
[441]397   
[445]398    // the main thread run also the sort() function
399    sort( &arg[main_uid] );
400
[623]401    ////////////////////////////
402    get_cycle( &para_end_cycle );
[619]403
[623]404    printf("\n[sort] main completes parallel sort at cycle %d\n", 
405    (unsigned int)para_end_cycle );
406
[619]407    // destroy barrier
408    pthread_barrier_destroy( &barrier );
409
410#if INTERACTIVE_MODE
411idbg();
412#endif
413
[625]414#if CHECK_RESULT
415   
416    int    success = 1;
417    int *  res_array = ( (total_threads ==   2) ||
418                         (total_threads ==   8) || 
419                         (total_threads ==  32) || 
420                         (total_threads == 128) || 
421                         (total_threads == 512) ) ? array1 : array0;
[623]422
[625]423    for( n=0 ; n<(ARRAY_LENGTH-2) ; n++ )
[417]424    {
[625]425        if ( res_array[n] > res_array[n+1] )
426        {
427            printf("\n[sort] array[%d] = %d > array[%d] = %d\n",
428            n , res_array[n] , n+1 , res_array[n+1] );
429            success = 0;
430            break;
431        }
[417]432    }
433
[625]434    if ( success ) printf("\n[sort] success\n");
435    else           printf("\n[sort] failure\n");
436
[417]437#endif
438
[623]439#if INSTRUMENTATION
[417]440
[625]441    char   name[64];
442    char   path[128];
[417]443
[625]444    // build a file name from n_items / n_clusters / n_cores
445    if( USE_DQT_BARRIER ) snprintf( name , 64 , "sort_dqt_%d_%d_%d", 
446                          ARRAY_LENGTH, x_size * y_size, ncores );
447    else                  snprintf( name , 64 , "sort_smp_%d_%d_%d", 
448                          ARRAY_LENGTH, x_size * y_size, ncores );
[623]449
[625]450    // build file pathname
451    snprintf( path , 128 , "home/%s" , name );
[623]452
[625]453    // compute results
454    unsigned int sequencial = (unsigned int)(seq_end_cycle - start_cycle);
455    unsigned int parallel   = (unsigned int)(para_end_cycle - seq_end_cycle);
[623]456
[625]457    // display results on process terminal
458    printf("\n----- %s -----\n"
459           " - sequencial : %d cycles\n"
460           " - parallel   : %d cycles\n", 
461           name, sequencial, parallel );
[623]462
[625]463    // open file
464    FILE * stream = fopen( path , NULL );
465    if( stream == NULL )
466    {
467        printf("\n[sort error] cannot open instrumentation file <%s>\n", name );
468        exit(0);
469    }
[623]470
[625]471    printf("\n[sort] file %s successfully open\n", path);
472
473    // register results to file
474    int ret = fprintf( stream , "\n----- %s -----\n"
475                                " - sequencial : %d cycles\n"
476                                " - parallel   : %d cycles\n", name, sequencial, parallel );
477    if( ret < 0 )
478    {
479        printf("\n[sort error] cannot write to instrumentation file <%s>\n", name );
480        exit(0);
481    }
482
483    printf("\n[sort] file %s successfully written\n", path);
484
485    // close instrumentation file
486
487    if( fclose( stream ) )
488    {
489        printf("\n[sort error] cannot close the file <%s>\n", name );
490        exit(0);
491    }
492
493    printf("\n[sort] file %s successfully closed\n", path);
494
[619]495#endif
496
497    exit( 0 );
498
[417]499}  // end main()
500
501/*
502vim: tabstop=4 : shiftwidth=4 : expandtab
503*/
Note: See TracBrowser for help on using the repository browser.