cloudy  trunk
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
mpi_utilities.cpp
Go to the documentation of this file.
1 /* This file is part of Cloudy and is copyright (C)1978-2022 by Gary J. Ferland and
2  * others. For conditions of distribution and use see copyright notice in license.txt */
3 #include "cddefines.h"
4 #include "save.h"
5 #include "dynamics.h"
6 #include "grid.h"
7 #include "service.h"
8 #include "rfield.h"
9 #if defined(__unix) || defined(__APPLE__)
10 #include <cstddef>
11 #include <sys/types.h>
12 #include <sys/wait.h>
13 #include <unistd.h>
14 #else
15 #define pid_t int
16 #define fork() TotalInsanityAsStub<pid_t>()
17 #define wait(X) TotalInsanityAsStub<pid_t>()
18 #endif
19 #ifdef __linux__
20 #include <sys/vfs.h>
21 // these come from /usr/include/linux/magic.h, but from linux kernel 3.7
22 // onwards, the file seems to have moved to /usr/include/uapi/linux/magic.h...
23 // to avoid the mess we copy the constants here
24 static const long NFS_SUPER_MAGIC = 0x6969L;
25 static const long SMB_SUPER_MAGIC = 0x517BL;
26 static const long AUTOFS_SUPER_MAGIC = 0x0187L;
27 // this one is not included in my header file (sshfs is also based on fuse)
28 static const long FUSE_SUPER_MAGIC = 0x65735546L;
29 #endif
30 
31 STATIC void GridGatherOutput(const string&,long,const vector<long>&);
32 STATIC void GridGatherOutputSequential(const string&,long);
33 STATIC void GridGatherOutputParallel(const string&,long,const vector<long>&);
34 STATIC bool lgIsRemote(const string&);
35 STATIC void check_grid_file(const string&,int,int);
36 
37 #ifndef MPI_ENABLED
38 
43 
44 int MPI_SUCCESS = 0;
45 int MPI_ERR_INTERN = -1;
47 
49 {
50  return TotalInsanityAsStub<int>();
51 }
52 
53 #endif
54 
58 
59 // NB NB this routine cannot throw any exceptions as it is executed outside
60 // the try{} block -- this includes mechanisms like ASSERT and cdEXIT!
61 void load_balance::init( unsigned int nJobs, unsigned int nCPU )
62 {
63  if( nJobs == 0 )
64  return;
65 
66  bool lgMPI = cpu.i().lgMPI();
67 
68  p_jobs.resize( nJobs );
69  p_rank = cpu.i().nRANK();
70  p_ptr = p_rank;
71  p_ncpu = min(nCPU,nJobs);
72  // the master rank will now set up a random sequence for the jobs
73  // this way we hope to get statistical load balancing of the ranks
74  if( p_rank == 0 )
75  {
76  for( unsigned int i=0; i < nJobs; ++i )
77  p_jobs[i] = i;
78 
79  if( p_ncpu > 1 )
80  {
81  // This may or may not seed the random number generator used by random_shuffle.
82  // The C++11 solution using <random> only works with g++ 6 or higher...
83  srand( unsigned( time(NULL) ) );
84  random_shuffle( p_jobs.begin(), p_jobs.end() );
85  }
86  }
87  // now broadcast the random sequence to the other ranks...
88  if( lgMPI )
89  MPI_Bcast( &p_jobs[0], nJobs, MPI_type(p_jobs[0]), 0, MPI_COMM_WORLD );
90  else
91  for( unsigned int i=1; i < p_ncpu; ++i )
92  {
93  fflush(NULL);
94  pid_t pid = fork();
95  if( pid < 0 )
96  {
97  fprintf( ioQQQ, "creating the child process failed\n" );
98  // this _should_ be exit() not cdEXIT()!
99  exit(EXIT_FAILURE);
100  }
101  else if( pid == 0 )
102  {
103  /* this is child process */
104  p_rank = i;
105  p_ptr = p_rank;
106  cpu.i().set_nRANK( p_rank );
107  return;
108  }
109  }
110 }
111 
113 {
114  // wait for all jobs to finish
115  if( cpu.i().lgMPI() )
116  MPI_Barrier( MPI_COMM_WORLD );
117  else
118  {
119  if( p_rank == 0 )
120  for( unsigned int i=1; i < p_ncpu; ++i )
121  (void)wait(NULL);
122  else
123  // this _should_ be exit() not cdEXIT()!
124  exit(exit_status);
125  }
126 }
127 
130 {
131  DEBUG_ENTRY( "process_output()" );
132 
133  // NOTE: when this routine is called all file handles have already been closed
134 
135  try
136  {
137  string main_output = save.chRedirectPrefix + ".out";
138 
139  // balance work over the ranks
140  // rank n will process file numbers i with bound[n] <= i < bound[n+1]
141  // in non-MPI runs, this will result in:
142  // bound[0] = 0; bound[1] = grid.totNumModels;
143  int nCPU = cpu.i().lgMPI() ? cpu.i().nCPU() : 1;
144  int nRANK = cpu.i().lgMPI() ? cpu.i().nRANK() : 0;
145  long stride = grid.totNumModels/nCPU;
146  vector<long> bound( nCPU+1, stride );
147  long remainder = grid.totNumModels - stride*nCPU;
148  for( long i=1; i <= remainder; ++i )
149  ++bound[i];
150  bound[0] = 0;
151  for( long i=1; i <= nCPU; ++i )
152  bound[i] += bound[i-1];
153 
154  ASSERT( bound[nCPU] == grid.totNumModels );
155 
156  // first process main output files
158  GridGatherOutput( main_output, grid.totNumModels, bound );
159 
160  // remove input files for individual grid points
161  for( long j=0; j < grid.totNumModels; ++j )
162  {
163  if( j >= bound[nRANK] && j < bound[nRANK+1] )
164  {
165  string in_name = GridPointPrefix(j) + save.chRedirectPrefix + ".in";
166  remove( in_name.c_str() );
167  }
168  }
169 
170  ASSERT( long(save.chFileName.size()) == save.nsave );
171 
172  for( long ipPun=0; ipPun < save.nsave; ++ipPun )
173  {
174  string fnam = save.chFilenamePrefix;
175  fnam += save.chFileName[ipPun];
176  // first do a minimal check on the validity of the save files
177  for( int j=0; j < grid.totNumModels; ++j )
178  if( j >= bound[nRANK] && j < bound[nRANK+1] )
179  check_grid_file( fnam, j, ipPun );
180  // and concatenate the output if necessary
181  if( save.lgXSPEC[ipPun] )
182  {
183  if( cpu.i().lgMaster() )
184  {
185  ASSERT( save.FITStype[ipPun] >= 0 &&
187 
188  // combine the grid.Spectra data from all ranks.
189  // this is done by reading the results from file.
190  for( int j=0; j < grid.totNumModels; ++j )
191  {
192  string gridnam = GridPointPrefix(j) + fnam;
194  size_t(rfield.nflux)*sizeof(realnum),
195  gridnam.c_str());
196  remove( gridnam.c_str() );
197  }
198 
199  FILE *dest = open_data( fnam.c_str(), "ab" );
200  // dest points to an empty file, so generate the complete FITS file now
202  save.punarg[ipPun][1], save.punarg[ipPun][2] );
203  fseek( dest, 0, SEEK_END );
204  ASSERT( ftell(dest)%2880 == 0 );
205  fclose( dest );
206  }
207  }
208  else if( save.lgSaveToSeparateFiles[ipPun] )
209  {
210  if( cpu.i().lgMaster() )
211  {
212  // open in binary mode in case we are writing a FITS file
213  FILE *dest = open_data( fnam.c_str(), "ab" );
214  // keep the save files for each grid point separate
215  // the main save file contains the save header
216  // salvage it by prepending it to the first save file
217  string gridnam = GridPointPrefix(0) + fnam;
218  append_file( dest, gridnam.c_str() );
219  fclose( dest );
220  // this will overwrite the old file gridnam
221  rename( fnam.c_str(), gridnam.c_str() );
222  }
223  }
224  else
225  {
226  GridGatherOutput( fnam, grid.totNumModels, bound );
227  }
228  }
229  }
230  catch( ... )
231  {
232  fprintf( ioQQQ, "PROBLEM - an internal error occurred while post-processing the grid output\n" );
233  }
234 }
235 
236 STATIC void GridGatherOutput(const string& basenam,
237  long nfiles,
238  const vector<long>& bound)
239 {
240  if( cpu.i().lgMPI() && !lgIsRemote(basenam) )
241  GridGatherOutputParallel( basenam, nfiles, bound );
242  else {
243  if( cpu.i().lgMaster() )
244  GridGatherOutputSequential( basenam, nfiles );
245  }
246 }
247 
248 STATIC void GridGatherOutputSequential(const string& basenam,
249  long nfiles)
250 {
251  // open in binary mode in case we are writing a FITS file
252  FILE* output_handle = open_data( basenam.c_str(), "ab", AS_LOCAL_ONLY_TRY );
253  if( output_handle == NULL )
254  return;
255 
256  for( long j=0; j < nfiles; ++j )
257  {
258  string gridnam = GridPointPrefix(j) + basenam;
259  append_file( output_handle, gridnam.c_str() );
260  remove( gridnam.c_str() );
261  }
262  fclose( output_handle );
263 }
264 
265 STATIC void GridGatherOutputParallel(const string& basenam,
266  long nfiles,
267  const vector<long>& bound)
268 {
269  // determine total amount of data each rank has to copy
270  // by summing the individual file sizes
271  MPI_Offset mySize = 0;
272  for( long j=0; j < nfiles; ++j )
273  {
274  if( j >= bound[cpu.i().nRANK()] && j < bound[cpu.i().nRANK()+1] )
275  {
276  string gridnam = GridPointPrefix(j) + basenam;
277  FILE* fh = open_data( gridnam.c_str(), "r", AS_LOCAL_ONLY_TRY );
278  if( fh != NULL )
279  {
280  fseek( fh, 0, SEEK_END );
281  mySize += static_cast<MPI_Offset>( ftell(fh) );
282  fclose(fh);
283  }
284  }
285  }
286 
287  // broadcast the computed amounts to all ranks so that each
288  // rank can compute the offset where it needs to start writing
289  vector<MPI_Offset> offset(cpu.i().nCPU());
290  for( int i=0; i < cpu.i().nCPU(); ++i )
291  {
292  MPI_Offset myCopy = mySize;
293  // directly using &offset[i] below instead of the detour via
294  // &myCopy leads to segfaults for reasons that I cannot fathom...
295  MPI_Bcast( &myCopy, 1, MPI_type(myCopy), i, MPI_COMM_WORLD );
296  offset[i] = myCopy;
297  }
298 
299  MPI_File output_handle = open_data( basenam.c_str(), mpi_mode_a, AS_LOCAL_ONLY_TRY );
300  if( output_handle == MPI_FILE_NULL )
301  return;
302 
303  // compute offset where each rank needs to start writing
304  MPI_Offset totalSize = 0;
305  (void)MPI_File_get_size( output_handle, &totalSize );
306  for( int j=0; j < cpu.i().nCPU(); ++j )
307  {
308  MPI_Offset tmp = offset[j];
309  offset[j] = totalSize;
310  totalSize += tmp;
311  }
312 
313  // now gather the output and remove the individual files
314  (void)MPI_File_set_view( output_handle, offset[cpu.i().nRANK()],
315  MPI_CHAR, MPI_CHAR, const_cast<char*>("native"),
316  MPI_INFO_NULL );
317  for( long j=0; j < nfiles; ++j )
318  {
319  if( j >= bound[cpu.i().nRANK()] && j < bound[cpu.i().nRANK()+1] )
320  {
321  string gridnam = GridPointPrefix(j) + basenam;
322  append_file( output_handle, gridnam.c_str() );
323  remove( gridnam.c_str() );
324  }
325  }
326  MPI_File_close( &output_handle );
327 }
328 
329 // determine if a file resides on a remote share
330 // this is needed to determine if MPI-IO can operate safely on the file
331 #ifdef __linux__
332 STATIC bool lgIsRemote(const string& fnam)
333 {
334  struct statfs buf;
335  int res = statfs( fnam.c_str(), &buf );
336  if( res != 0 )
337  return true;
338  // parallel file systems do not count as remote since MPI-IO is supported on those
339  if( buf.f_type == NFS_SUPER_MAGIC ||
340  buf.f_type == SMB_SUPER_MAGIC ||
341  buf.f_type == AUTOFS_SUPER_MAGIC ||
342  buf.f_type == FUSE_SUPER_MAGIC )
343  return true;
344  else
345  return false;
346 }
347 #else
348 STATIC bool lgIsRemote(const string&)
349 {
350  // we do not know how to determine this, so we assume the worst
351  return true;
352 }
353 #endif
354 
356 STATIC void check_grid_file( const string& fnam, int j, int ipPun )
357 {
358  DEBUG_ENTRY( "check_grid_file()" );
359 
360  // these are binary files, don't touch them...
361  if( save.lgFITS[ipPun] )
362  return;
363 
364  bool lgForceNoDelimiter = false;
365  // in these cases there should not be a GRID_DELIMIT string...
366  if( !save.lgHashEndIter[ipPun] || !save.lg_separate_iterations[ipPun] ||
367  dynamics.lgTimeDependentStatic || strcmp( save.chHashString, "TIME_DEP" ) == 0 ||
368  strcmp( save.chHashString, "\n" ) == 0 )
369  lgForceNoDelimiter = true;
370 
371  bool lgAppendDelimiter = true;
372  bool lgAppendNewline = false;
373  string gridnam = GridPointPrefix(j) + fnam;
374  fstream str;
375  open_data( str, gridnam.c_str(), mode_r, AS_LOCAL_ONLY_TRY );
376  if( str.is_open() )
377  {
378  str.seekg( 0, ios_base::end );
379  if( str.good() && str.tellg() > 0 )
380  {
381  // check if the file ends in a newline
382  str.seekg( -1, ios_base::cur );
383  char chr;
384  str.get( chr );
385  lgAppendNewline = ( chr != '\n' );
386  // check if the GRID_DELIMIT string is present
387  string line;
388  str.seekg( 0, ios_base::beg );
389  while( getline( str, line ) )
390  {
391  if( line.find( "GRID_DELIMIT" ) != string::npos )
392  lgAppendDelimiter = false;
393  }
394  }
395  str.close();
396  }
397  if( lgForceNoDelimiter )
398  lgAppendDelimiter = false;
399  if( lgAppendNewline || lgAppendDelimiter )
400  {
401  open_data( str, gridnam.c_str(), mode_a, AS_LOCAL_ONLY_TRY );
402  if( str.is_open() )
403  {
404  if( lgAppendNewline )
405  str << endl;
406  if( lgAppendDelimiter )
407  {
408  str << save.chHashString << " GRID_DELIMIT -- grid";
409  str << setfill( '0' ) << setw(9) << j << endl;
410  }
411  str.close();
412  }
413  }
414 }
415 
417 void append_file( FILE *dest, const char *source )
418 {
419  DEBUG_ENTRY( "append_file()" );
420 
421  FILE *src = open_data( source, "rb", AS_LOCAL_ONLY_TRY );
422  if( src == NULL )
423  return;
424 
425  // limited testing shows that using a 4 KiB buffer should
426  // give performance that is at least very close to optimal
427  // tests were done by concatenating 10 copies of a 62.7 MiB file
428  const size_t BUF_SIZE = 4096;
429  char buf[BUF_SIZE];
430 
431  while( ! feof(src) )
432  {
433  size_t nb = fread( buf, sizeof(char), BUF_SIZE, src );
434  fwrite( buf, sizeof(char), nb, dest );
435  }
436  fclose(src);
437  return;
438 }
439 
441 void append_file( MPI_File dest, const char *source )
442 {
443  DEBUG_ENTRY( "append_file()" );
444 
445  FILE *src = open_data( source, "rb", AS_LOCAL_ONLY_TRY );
446  if( src == NULL )
447  return;
448 
449  // use larger buffer for parallel file systems
450  const size_t BUF_SIZE = 32768;
451  char buf[BUF_SIZE];
452 
453  while( ! feof(src) )
454  {
455  size_t nb = fread( buf, sizeof(char), BUF_SIZE, src );
456  MPI_Status status;
457  (void)MPI_File_write( dest, buf, nb, MPI_CHAR, &status );
458  }
459  fclose(src);
460  return;
461 }
long nRANK() const
Definition: cpu.h:395
realnum punarg[LIMPUN][3]
Definition: save.h:372
FILE * open_data(const char *fname, const char *mode, access_scheme scheme)
Definition: cpu.cpp:765
void finalize(exit_type exit_status)
STATIC long int ipPun
Definition: save_do.cpp:367
unsigned int p_ncpu
long MPI_Status
Definition: mpi_utilities.h:73
string chFilenamePrefix
Definition: save.h:431
t_cpu_i & i()
Definition: cpu.h:419
#define pid_t
STATIC void GridGatherOutput(const string &, long, const vector< long > &)
bool lgKeepMainOutputSeparate
Definition: grid.h:53
int MPI_MODE_WRONLY
bool lgTimeDependentStatic
Definition: dynamics.h:102
int MPI_ERR_INTERN
STATIC void check_grid_file(const string &, int, int)
int MPI_MODE_CREATE
char chHashString[INPUT_LINE_LENGTH]
Definition: save.h:416
#define MPI_Barrier(Z)
Definition: mpi_utilities.h:84
void * MPI_File
Definition: mpi_utilities.h:74
#define MPI_File_close(Z)
Definition: mpi_utilities.h:95
FILE * ioQQQ
Definition: cddefines.cpp:7
void set_nRANK(long n)
Definition: cpu.h:394
t_dynamics dynamics
Definition: dynamics.cpp:42
exit_type
Definition: cddefines.h:142
unsigned int p_ptr
long int nsave
Definition: save.h:318
vector< int > p_jobs
void init(unsigned int nJobs, unsigned int nCPU)
int mpi_mode_a
long totNumModels
Definition: grid.h:61
bool lg_separate_iterations[LIMPUN]
Definition: save.h:334
const ios_base::openmode mode_a
Definition: cpu.h:269
const ios_base::openmode mode_r
Definition: cpu.h:267
#define STATIC
Definition: cddefines.h:118
int MPI_MODE_RDONLY
bool lgXSPEC[LIMPUN]
Definition: save.h:395
int MPI_SUCCESS
#define MPI_File_set_view(U, V, W, X, Y, Z)
Definition: mpi_utilities.h:92
STATIC void GridGatherOutputParallel(const string &, long, const vector< long > &)
void saveFITSfile(FILE *io, int option, realnum Elo=0.f, realnum Ehi=0.f, realnum Enorm=0.f)
Definition: save_fits.cpp:86
t_rfield rfield
Definition: rfield.cpp:9
STATIC bool lgIsRemote(const string &)
float realnum
Definition: cddefines.h:124
#define wait(X)
const int NUM_OUTPUT_TYPES
Definition: grid.h:22
#define EXIT_FAILURE
Definition: cddefines.h:168
bool lgMaster() const
Definition: cpu.h:396
void rd_block(void *ptr, size_t len, FILE *fdes)
Definition: service.h:70
MPI_File MPI_FILE_NULL
long min(int a, long b)
Definition: cddefines.h:762
multi_arr< realnum, 3 > Spectra
Definition: grid.h:26
#define MPI_File_write(V, W, X, Y, Z)
Definition: mpi_utilities.h:94
int total_insanity(MPI_File, int, MPI_Status *)
t_grid grid
Definition: grid.cpp:5
#define MPI_File_get_size(Y, Z)
Definition: mpi_utilities.h:93
void append_file(FILE *dest, const char *source)
#define ASSERT(exp)
Definition: cddefines.h:613
int FITStype[LIMPUN]
Definition: save.h:398
bool lgFITS[LIMPUN]
Definition: save.h:392
bool lgHashEndIter[LIMPUN]
Definition: save.h:412
int MPI_MODE_APPEND
#define DEBUG_ENTRY(funcname)
Definition: cddefines.h:723
bool lgMPI() const
Definition: cpu.h:391
unsigned int p_rank
int fprintf(const Output &stream, const char *format,...)
Definition: service.cpp:1121
long nCPU() const
Definition: cpu.h:388
string GridPointPrefix(int n)
void process_output()
static t_cpu cpu
Definition: cpu.h:427
t_save save
Definition: save.cpp:5
long MPI_Offset
Definition: mpi_utilities.h:72
string chRedirectPrefix
Definition: save.h:435
long int nflux
Definition: rfield.h:46
#define fork()
bool lgSaveToSeparateFiles[LIMPUN]
Definition: save.h:330
STATIC void GridGatherOutputSequential(const string &, long)
#define MPI_Bcast(V, W, X, Y, Z)
Definition: mpi_utilities.h:85
vector< string > chFileName
Definition: save.h:281
int mpi_mode_r
int mpi_mode_w