source: trunk/user/sort/sort.c @ 440

Last change on this file since 440 was 440, checked in by alain, 6 years ago

1/ Fix a bug in the Multithreaded "sort" applicationr:
The pthread_create() arguments must be declared as global variables.
2/ The exit syscall can be called by any thread of a process..

  • Property svn:executable set to *
File size: 12.2 KB
Line 
1///////////////////////////////////////////////////////////////////////////////
2// File   :  sort.c
3// Date   :  November 2013
4// Author :  Cesar Fuguet Tortolero <cesar.fuguet-tortolero@lip6.fr>
5///////////////////////////////////////////////////////////////////////////////
6// This multi-threaded application implement a multi-stage sort application.
7// The various stages are separated by synchronisation barriers.
8// There is one thread per physical cores.
9// Computation is organised as a binary tree:
10// - All threads execute in parallel a buble sort on a sub-array during the
11//   the first stage of parallel sort,
12// - The number of participating threads is divided by 2 at each next stage,
13//   to make a merge sort, on two subsets of previous stage.
14//
15//       Number_of_stages = number of barriers = log2(Number_of_threads)
16//
17// Constraints :
18// - It supports up to 1024 cores: x_size, y_size, and ncores must be
19//   power of 2 (max 16*16 clusters / max 4 cores per cluster)
20// _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2
21//   larger than the number of cores.
22///////////////////////////////////////////////////////////////////////////////
23
24#include <stdio.h>
25#include <stdlib.h>
26#include <malloc.h>
27#include <pthread.h>
28
29#define ARRAY_LENGTH        0x100    // 256 values
30
31#define MAX_THREADS         1024     // 16 * 16 * 4
32
33#define DISPLAY_ARRAY       0
34#define DISPLAY_THREADS     1
35
36///////////////////////////////////////////////////////
37// macros for fixed format cxy <=> (x,y) translation
38// TODO these macros are only for TSAR architecture...
39///////////////////////////////////////////////////////
40
41#define CXY_FROM_XY( x , y )  ((x<<4) + y)
42
43#define X_FROM_CXY( cxy )     ((cxy>>4) & 0xF)
44
45#define Y_FROM_CXY( cxy )     (cxy & 0xF)
46
47/////////////////////////////////////////////////////////////
48// argument for the sort() function (one thread per core)
49/////////////////////////////////////////////////////////////
50
51typedef struct
52{
53    unsigned int threads;      // total number of threads
54    unsigned int thread_uid;    // thread user index (0 to threads -1)
55    unsigned int main_uid;      // main thread user index
56}
57args_t;
58
59//////////////////////////////////////////
60//      Global variables
61//////////////////////////////////////////
62
63int                 array0[ARRAY_LENGTH];    // values to sort
64int                 array1[ARRAY_LENGTH];   
65
66pthread_barrier_t   barrier;                 // synchronisation variables
67
68pthread_attr_t      attr[MAX_THREADS];       // thread attributes (one per thread)
69args_t              arg[MAX_THREADS];        // sort function arguments (one per thread)
70
71////////////////////////////////////
72void bubbleSort( int *        array,
73                 unsigned int length,
74                 unsigned int init_pos )
75{
76    int i;
77    int j;
78    int aux;
79
80    for(i = 0; i < length; i++)
81    {
82        for(j = init_pos; j < (init_pos + length - i - 1); j++)
83        {
84            if(array[j] > array[j + 1])
85            {
86                aux          = array[j + 1];
87                array[j + 1] = array[j];
88                array[j]     = aux;
89            }
90        }
91    }
92}  // end bubbleSort()
93
94
95/////////////////////////
96void merge( int * src,
97            int * dst,
98            int length,
99            int init_pos_src_a,
100            int init_pos_src_b,
101            int init_pos_dst )
102{
103    int i;
104    int j;
105    int k;
106
107    i = 0;
108    j = 0;
109    k = init_pos_dst;
110
111    while((i < length) || (j < length))
112    {
113        if((i < length) && (j < length))
114        {
115            if(src[init_pos_src_a + i] < src[init_pos_src_b + j])
116            {
117                dst[k++] = src[init_pos_src_a + i];
118                i++;
119            }
120            else
121            {
122                dst[k++] = src[init_pos_src_b + j];
123                j++;
124            }
125        }
126        else if(i < length)
127        {
128            dst[k++] = src[init_pos_src_a + i];
129            i++;
130        }
131        else
132        {
133            dst[k++] = src[init_pos_src_b + j];
134            j++;
135        }
136    }
137}  // end merge()
138
139/////////////////////////
140void sort( args_t * ptr )
141{
142    unsigned int       i;
143    unsigned long long cycle;
144    unsigned int       cxy;
145    unsigned int       lid;
146
147    int         * src_array  = NULL;
148    int         * dst_array  = NULL;
149
150    // get core coordinates an date
151    get_core( &cxy , &lid );
152    get_cycle( &cycle );
153
154    unsigned int  thread_uid = ptr->thread_uid;
155    unsigned int  threads    = ptr->threads;
156    unsigned int  main_uid   = ptr->main_uid;
157
158printf("\n### core[%x,%d] enter sort : threads %d / thread_uid %x / main_uid %x / cycle %d\n",
159cxy, lid, threads, thread_uid, main_uid, (int)cycle );
160
161    while( 1 ) { asm volatile("nop"); }
162
163    unsigned int  items      = ARRAY_LENGTH / threads;
164    unsigned int  stages     = __builtin_ctz( threads ) + 1;
165
166    bubbleSort( array0, items, items * thread_uid );
167
168    printf("\n[SORT] thread[%d] / stage 0 completed\n", thread_uid );
169
170    /////////////////////////////////
171    pthread_barrier_wait( &barrier ); 
172
173    printf("\n[SORT] thread[%d] exit barrier\n", thread_uid );
174
175    // the number of threads contributing to sort
176    // is divided by 2 at each next stage
177    for ( i = 1 ; i < stages ; i++ )
178    {
179        pthread_barrier_wait( &barrier );
180
181        if( (thread_uid & ((1<<i)-1)) == 0 )
182        {
183            printf("\n[SORT] thread[%d] / stage %d start\n", thread_uid , i );
184
185            if((i % 2) == 1)               // odd stage
186            {
187                src_array = array0;
188                dst_array = array1;
189            }
190            else                           // even stage
191            {
192                src_array = array1;
193                dst_array = array0;
194            }
195
196            merge( src_array, 
197                   dst_array,
198                   items << i,
199                   items * thread_uid,
200                   items * (thread_uid + (1 << (i-1))),
201                   items * thread_uid );
202
203            printf("\n[SORT] thread[%d] / stage %d completed\n", thread_uid , i );
204        }
205
206        /////////////////////////////////
207        pthread_barrier_wait( &barrier );
208
209    }
210
211    // all threads but the main thread exit
212    if( thread_uid != main_uid ) pthread_exit( NULL );
213
214} // end sort()
215
216
217///////////
218void main()
219{
220    unsigned int           x_size;             // number of rows
221    unsigned int           y_size;             // number of columns
222    unsigned int           ncores;             // number of cores per cluster
223    unsigned int           threads;            // total number of threads
224    unsigned int           thread_uid;         // user defined thread index
225    unsigned int           main_cxy;           // cluster identifier for main
226    unsigned int           main_x;             // X coordinate for main thread
227    unsigned int           main_y;             // Y coordinate for main thread
228    unsigned int           main_lid;           // core local index for main thread
229    unsigned int           main_uid;           // thread user index for main thread
230    unsigned int           x;                  // X coordinate for a thread
231    unsigned int           y;                  // Y coordinate for a thread
232    unsigned int           lid;                // core local index for a thread
233    unsigned int           n;                  // index in array to sort
234    unsigned long long     cycle;              // current date for log
235    pthread_t              trdid;              // kernel allocated thread index (unused)
236    pthread_barrierattr_t  barrier_attr;       // barrier attributes
237
238    // compute number of threads (one thread per proc)
239    get_config( &x_size , &y_size , &ncores );
240    threads = x_size * y_size * ncores;
241
242    // get core coordinates and user index for the main thread
243    get_core( &main_cxy , & main_lid );
244    main_x   = X_FROM_CXY( main_cxy );
245    main_y   = Y_FROM_CXY( main_cxy );
246    main_uid = (((main_x * y_size) + main_y) * ncores) + main_lid; 
247
248    // checks number of threads
249    if ( (threads != 1)   && (threads != 2)   && (threads != 4)   && 
250         (threads != 8)   && (threads != 16 ) && (threads != 32)  && 
251         (threads != 64)  && (threads != 128) && (threads != 256) && 
252         (threads != 512) && (threads != 1024) )
253    {
254        printf("\n[SORT ERROR] number of cores must be power of 2\n");
255        exit( 0 );
256    }
257
258    // check array size
259    if ( ARRAY_LENGTH % threads) 
260    {
261        printf("\n[SORT ERROR] array size must be multiple of number of threads\n");
262        exit( 0 );
263    }
264
265    get_cycle( &cycle );
266    printf("\n\n[SORT] main starts on core[%x,%d] / %d threads / %d values / cycle %d\n",
267    main_cxy, main_lid, threads, ARRAY_LENGTH, (unsigned int)cycle );
268
269    // Barrier initialization
270    barrier_attr.x_size   = x_size; 
271    barrier_attr.y_size   = y_size;
272    barrier_attr.nthreads = ncores;
273    if( pthread_barrier_init( &barrier, &barrier_attr , threads ) )
274    {
275        printf("\n[SORT ERROR] cannot initialise barrier\n" );
276        exit( 0 );
277    }
278
279    get_cycle( &cycle );
280    printf("\n[SORT] main completes barrier init at cycle %d\n", (unsigned int)cycle );
281
282    // Array to sort initialization
283    for ( n = 0 ; n < ARRAY_LENGTH ; n++ )
284    {
285        array0[n] = rand();
286    }
287
288#if DISPLAY_ARRAY
289printf("\n*** array before sort\n");
290for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
291#endif
292
293    get_cycle( &cycle );
294    printf("\n[SORT] main completes array init at cycle %d\n", (unsigned int)cycle );
295
296    // launch other threads to execute sort() function
297    // on cores other than the core running the main thread
298    for ( x=0 ; x<x_size ; x++ )
299    {
300        for ( y=0 ; y<y_size ; y++ )
301        {
302            for ( lid=0 ; lid<ncores ; lid++ )
303            {
304                thread_uid = (((x * y_size) + y) * ncores) + lid;
305
306                // set sort arguments for all threads
307                arg[thread_uid].threads      = threads;
308                arg[thread_uid].thread_uid   = thread_uid;
309                arg[thread_uid].main_uid     = main_uid;
310
311                // set thread attributes for all threads
312                attr[thread_uid].attributes = PT_ATTR_CLUSTER_DEFINED | PT_ATTR_CORE_DEFINED;
313                attr[thread_uid].cxy        = CXY_FROM_XY( x , y );
314                attr[thread_uid].lid        = lid;
315
316                if( thread_uid != main_uid )
317                {
318
319get_cycle( &cycle );
320printf("\n### main creates thread_uid %d / &sort_arg %x / cycle %d\n",
321thread_uid, &arg[thread_uid], (unsigned int)cycle );
322
323                    if ( pthread_create( &trdid,              // not used because no join
324                                         &attr[thread_uid],   // thread attributes
325                                         &sort,               // entry function
326                                         &arg[thread_uid] ) ) // sort arguments
327                    {
328                        printf("\n[SORT ERROR] main created thread %x \n", thread_uid );
329                        exit( 0 );
330                    }
331                }
332         
333#if DISPLAY_THREADS
334display_sched( CXY_FROM_XY(x,y) , lid );
335#endif
336            }
337        }
338    }
339
340    get_cycle( &cycle );
341    printf("\n[SORT] main completes threads create at cycle %d\n", (unsigned int)cycle );
342
343    // main run also the sort() function
344    sort( &arg[main_uid] );
345
346    // Check result
347    int    success = 1;
348    int*   res_array = ( (threads==  2) ||
349                         (threads==  8) || 
350                         (threads== 32) || 
351                         (threads==128) || 
352                         (threads==512) ) ? array1 : array0;
353   
354    for( n=0 ; n<(ARRAY_LENGTH-1) ; n++ )
355    {
356        if ( res_array[n] > res_array[n+1] )
357        {
358            printf("\n[SORT] array[%d] = %d > array[%d] = %d\n",
359            n , res_array[n] , n+1 , res_array[n+1] );
360            success = 0;
361            break;
362        }
363    }
364
365#if DISPLAY_ARRAY
366printf("\n*** array after sort\n");
367for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , res_array[n] );
368#endif
369
370    get_cycle( &cycle );
371
372    if ( success )
373    {
374        printf("\n[SORT] success at cycle %d\n", (unsigned int)cycle );
375        exit( 0 );
376    }
377    else
378    {
379        printf("\n[SORT] failure at cycle %d\n", (unsigned int)cycle );
380        exit( 1 );
381    }
382
383}  // end main()
384
385
386/*
387vim: tabstop=4 : shiftwidth=4 : expandtab
388*/
Note: See TracBrowser for help on using the repository browser.