Main Page | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members | Related Pages

dox/Parallel/vtkMultiProcessController.h

Go to the documentation of this file.
00001 /*========================================================================= 00002 00003 Program: Visualization Toolkit 00004 Module: $RCSfile: vtkMultiProcessController.h,v $ 00005 00006 Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen 00007 All rights reserved. 00008 See Copyright.txt or http://www.kitware.com/Copyright.htm for details. 00009 00010 This software is distributed WITHOUT ANY WARRANTY; without even 00011 the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR 00012 PURPOSE. See the above copyright notice for more information. 00013 00014 =========================================================================*/ 00045 #ifndef __vtkMultiProcessController_h 00046 #define __vtkMultiProcessController_h 00047 00048 #include "vtkObject.h" 00049 00050 #include "vtkCommunicator.h" // Needed for direct access to communicator 00051 00052 class vtkDataSet; 00053 class vtkImageData; 00054 class vtkCollection; 00055 class vtkOutputWindow; 00056 class vtkDataObject; 00057 class vtkMultiProcessController; 00058 00059 //BTX 00060 // The type of function that gets called when new processes are initiated. 00061 typedef void (*vtkProcessFunctionType)(vtkMultiProcessController *controller, 00062 void *userData); 00063 00064 // The type of function that gets called when an RMI is triggered. 00065 typedef void (*vtkRMIFunctionType)(void *localArg, 00066 void *remoteArg, int remoteArgLength, 00067 int remoteProcessId); 00068 //ETX 00069 00070 00071 class VTK_PARALLEL_EXPORT vtkMultiProcessController : public vtkObject 00072 { 00073 public: 00074 static vtkMultiProcessController *New(); 00075 vtkTypeRevisionMacro(vtkMultiProcessController,vtkObject); 00076 void PrintSelf(ostream& os, vtkIndent indent); 00077 00081 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv))=0; 00082 00084 00087 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv), 00088 int initializedExternally)=0; 00090 00093 virtual void Finalize()=0; 00094 00098 virtual void Finalize(int finalizedExternally)=0; 00099 00101 00104 virtual void SetNumberOfProcesses(int num); 00105 vtkGetMacro( NumberOfProcesses, int ); 00107 00108 //BTX 00110 00113 void SetSingleMethod(vtkProcessFunctionType, void *data); 00114 //ETX 00116 00120 virtual void SingleMethodExecute() = 0; 00121 00122 //BTX 00124 00128 void SetMultipleMethod(int index, vtkProcessFunctionType, void *data); 00129 //ETX 00131 00135 virtual void MultipleMethodExecute() = 0; 00136 00138 virtual int GetLocalProcessId() { return this->LocalProcessId; } 00139 00144 static vtkMultiProcessController *GetGlobalController(); 00145 00148 virtual void CreateOutputWindow() = 0; 00149 00151 00156 vtkSetMacro(ForceDeepCopy, int); 00157 vtkGetMacro(ForceDeepCopy, int); 00158 vtkBooleanMacro(ForceDeepCopy, int); 00160 00161 00162 //------------------ RMIs -------------------- 00163 //BTX 00169 void AddRMI(vtkRMIFunctionType, void *localArg, int tag); 00170 00172 00173 void RemoveRMI(vtkRMIFunctionType f, void *arg, int tag) 00174 {f = f; arg = arg; tag = tag; vtkErrorMacro("RemoveRMI Not Implemented Yet");}; 00175 //ETX 00177 00179 void TriggerRMI(int remoteProcessId, void *arg, int argLength, int tag); 00180 00183 void TriggerBreakRMIs(); 00184 00186 00187 void TriggerRMI(int remoteProcessId, char *arg, int tag) 00188 { this->TriggerRMI(remoteProcessId, (void*)arg, 00189 static_cast<int>(strlen(arg))+1, tag); } 00191 00193 00194 void TriggerRMI(int remoteProcessId, int tag) 00195 { this->TriggerRMI(remoteProcessId, NULL, 0, tag); } 00197 00200 void ProcessRMIs(); 00201 00203 00206 vtkSetMacro(BreakFlag, int); 00207 vtkGetMacro(BreakFlag, int); 00209 00211 vtkGetObjectMacro(Communicator, vtkCommunicator); 00213 00214 //BTX 00215 00216 enum Consts { 00217 MAX_PROCESSES=8192, 00218 ANY_SOURCE=-1, 00219 INVALID_SOURCE=-2, 00220 RMI_TAG=315167, 00221 RMI_ARG_TAG=315168, 00222 BREAK_RMI_TAG=239954 00223 }; 00224 00225 //ETX 00226 00228 virtual void Barrier() = 0; 00229 00230 static void SetGlobalController(vtkMultiProcessController *controller); 00231 00232 //------------------ Communication -------------------- 00233 00235 00237 int Send(int* data, int length, int remoteProcessId, int tag); 00238 int Send(unsigned long* data, int length, int remoteProcessId, 00239 int tag); 00240 int Send(char* data, int length, int remoteProcessId, int tag); 00241 int Send(unsigned char* data, int length, int remoteProcessId, int tag); 00242 int Send(float* data, int length, int remoteProcessId, int tag); 00243 int Send(double* data, int length, int remoteProcessId, int tag); 00244 #ifdef VTK_USE_64BIT_IDS 00245 int Send(vtkIdType* data, int length, int remoteProcessId, int tag); 00247 #endif 00248 int Send(vtkDataObject *data, int remoteId, int tag); 00249 int Send(vtkDataArray *data, int remoteId, int tag); 00250 00252 00255 int Receive(int* data, int length, int remoteProcessId, int tag); 00256 int Receive(unsigned long* data, int length, int remoteProcessId, 00257 int tag); 00258 int Receive(char* data, int length, int remoteProcessId, int tag); 00259 int Receive(unsigned char* data, int length, int remoteProcessId, int tag); 00260 int Receive(float* data, int length, int remoteProcessId, int tag); 00261 int Receive(double* data, int length, int remoteProcessId, int tag); 00262 #ifdef VTK_USE_64BIT_IDS 00263 int Receive(vtkIdType* data, int length, int remoteProcessId, int tag); 00265 #endif 00266 int Receive(vtkDataObject* data, int remoteId, int tag); 00267 int Receive(vtkDataArray* data, int remoteId, int tag); 00268 00269 // Internally implemented RMI to break the process loop. 00270 00271 protected: 00272 vtkMultiProcessController(); 00273 ~vtkMultiProcessController(); 00274 00275 int MaximumNumberOfProcesses; 00276 int NumberOfProcesses; 00277 00278 int LocalProcessId; 00279 00280 vtkProcessFunctionType SingleMethod; 00281 void *SingleData; 00282 vtkProcessFunctionType MultipleMethod[MAX_PROCESSES]; 00283 void *MultipleData[MAX_PROCESSES]; 00284 00285 vtkCollection *RMIs; 00286 00287 // This is a flag that can be used by the ports to break 00288 // their update loop. (same as ProcessRMIs) 00289 int BreakFlag; 00290 00291 void ProcessRMI(int remoteProcessId, void *arg, int argLength, int rmiTag); 00292 00293 // This method implements "GetGlobalController". 00294 // It needs to be virtual and static. 00295 virtual vtkMultiProcessController *GetLocalController(); 00296 00297 00298 // This flag can force deep copies during send. 00299 int ForceDeepCopy; 00300 00301 vtkOutputWindow* OutputWindow; 00302 00303 // Note that since the communicators can be created differently 00304 // depending on the type of controller, the subclasses are 00305 // responsible of deleting them. 00306 vtkCommunicator* Communicator; 00307 00308 // Communicator which is a copy of the current user 00309 // level communicator except the context; i.e. even if the tags 00310 // are the same, the RMI messages will not interfere with user 00311 // level messages. (This only works with MPI. When using threads, 00312 // the tags have to be unique.) 00313 // Note that since the communicators can be created differently 00314 // depending on the type of controller, the subclasses are 00315 // responsible of deleting them. 00316 vtkCommunicator* RMICommunicator; 00317 00318 private: 00319 vtkMultiProcessController(const vtkMultiProcessController&); // Not implemented. 00320 void operator=(const vtkMultiProcessController&); // Not implemented. 00321 }; 00322 00323 00324 inline int vtkMultiProcessController::Send(vtkDataObject *data, 00325 int remoteThreadId, int tag) 00326 { 00327 if (this->Communicator) 00328 { 00329 return this->Communicator->Send(data, remoteThreadId, tag); 00330 } 00331 else 00332 { 00333 return 0; 00334 } 00335 } 00336 00337 inline int vtkMultiProcessController::Send(vtkDataArray *data, 00338 int remoteThreadId, int tag) 00339 { 00340 if (this->Communicator) 00341 { 00342 return this->Communicator->Send(data, remoteThreadId, tag); 00343 } 00344 else 00345 { 00346 return 0; 00347 } 00348 } 00349 00350 inline int vtkMultiProcessController::Send(int* data, int length, 00351 int remoteThreadId, int tag) 00352 { 00353 if (this->Communicator) 00354 { 00355 return this->Communicator->Send(data, length, remoteThreadId, tag); 00356 } 00357 else 00358 { 00359 return 0; 00360 } 00361 } 00362 00363 inline int vtkMultiProcessController::Send(unsigned long* data, 00364 int length, int remoteThreadId, 00365 int tag) 00366 { 00367 if (this->Communicator) 00368 { 00369 return this->Communicator->Send(data, length, remoteThreadId, tag); 00370 } 00371 else 00372 { 00373 return 0; 00374 } 00375 } 00376 00377 inline int vtkMultiProcessController::Send(char* data, int length, 00378 int remoteThreadId, int tag) 00379 { 00380 if (this->Communicator) 00381 { 00382 return this->Communicator->Send(data, length, remoteThreadId, tag); 00383 } 00384 else 00385 { 00386 return 0; 00387 } 00388 } 00389 00390 inline int vtkMultiProcessController::Send(unsigned char* data, int length, 00391 int remoteThreadId, int tag) 00392 { 00393 if (this->Communicator) 00394 { 00395 return this->Communicator->Send(data, length, remoteThreadId, tag); 00396 } 00397 else 00398 { 00399 return 0; 00400 } 00401 } 00402 00403 inline int vtkMultiProcessController::Send(float* data, int length, 00404 int remoteThreadId, int tag) 00405 { 00406 if (this->Communicator) 00407 { 00408 return this->Communicator->Send(data, length, remoteThreadId, tag); 00409 } 00410 else 00411 { 00412 return 0; 00413 } 00414 } 00415 00416 inline int vtkMultiProcessController::Send(double* data, int length, 00417 int remoteThreadId, int tag) 00418 { 00419 if (this->Communicator) 00420 { 00421 return this->Communicator->Send(data, length, remoteThreadId, tag); 00422 } 00423 else 00424 { 00425 return 0; 00426 } 00427 } 00428 00429 #ifdef VTK_USE_64BIT_IDS 00430 inline int vtkMultiProcessController::Send(vtkIdType* data, int length, 00431 int remoteThreadId, int tag) 00432 { 00433 if (this->Communicator) 00434 { 00435 return this->Communicator->Send(data, length, remoteThreadId, tag); 00436 } 00437 else 00438 { 00439 return 0; 00440 } 00441 } 00442 #endif 00443 00444 inline int vtkMultiProcessController::Receive(vtkDataObject* data, 00445 int remoteThreadId, int tag) 00446 { 00447 if (this->Communicator) 00448 { 00449 return this->Communicator->Receive(data, remoteThreadId, tag); 00450 } 00451 else 00452 { 00453 return 0; 00454 } 00455 } 00456 00457 inline int vtkMultiProcessController::Receive(vtkDataArray* data, 00458 int remoteThreadId, int tag) 00459 { 00460 if (this->Communicator) 00461 { 00462 return this->Communicator->Receive(data, remoteThreadId, tag); 00463 } 00464 else 00465 { 00466 return 0; 00467 } 00468 } 00469 00470 inline int vtkMultiProcessController::Receive(int* data, int length, 00471 int remoteThreadId, int tag) 00472 { 00473 if (this->Communicator) 00474 { 00475 return this->Communicator->Receive(data, length, remoteThreadId, tag); 00476 } 00477 else 00478 { 00479 return 0; 00480 } 00481 } 00482 00483 inline int vtkMultiProcessController::Receive(unsigned long* data, 00484 int length,int remoteThreadId, 00485 int tag) 00486 { 00487 if (this->Communicator) 00488 { 00489 return this->Communicator->Receive(data, length, remoteThreadId, tag); 00490 } 00491 else 00492 { 00493 return 0; 00494 } 00495 } 00496 00497 inline int vtkMultiProcessController::Receive(char* data, int length, 00498 int remoteThreadId, int tag) 00499 { 00500 if (this->Communicator) 00501 { 00502 return this->Communicator->Receive(data, length, remoteThreadId, tag); 00503 } 00504 else 00505 { 00506 return 0; 00507 } 00508 } 00509 00510 inline int vtkMultiProcessController::Receive(unsigned char* data, int length, 00511 int remoteThreadId, int tag) 00512 { 00513 if (this->Communicator) 00514 { 00515 return this->Communicator->Receive(data, length, remoteThreadId, tag); 00516 } 00517 else 00518 { 00519 return 0; 00520 } 00521 } 00522 00523 inline int vtkMultiProcessController::Receive(float* data, int length, 00524 int remoteThreadId, int tag) 00525 { 00526 if (this->Communicator) 00527 { 00528 return this->Communicator->Receive(data, length, remoteThreadId, tag); 00529 } 00530 else 00531 { 00532 return 0; 00533 } 00534 } 00535 00536 inline int vtkMultiProcessController::Receive(double* data, int length, 00537 int remoteThreadId, int tag) 00538 { 00539 if (this->Communicator) 00540 { 00541 return this->Communicator->Receive(data, length, remoteThreadId, tag); 00542 } 00543 else 00544 { 00545 return 0; 00546 } 00547 } 00548 00549 #ifdef VTK_USE_64BIT_IDS 00550 inline int vtkMultiProcessController::Receive(vtkIdType* data, int length, 00551 int remoteThreadId, int tag) 00552 { 00553 if (this->Communicator) 00554 { 00555 return this->Communicator->Receive(data, length, remoteThreadId, tag); 00556 } 00557 else 00558 { 00559 return 0; 00560 } 00561 } 00562 #endif 00563 00564 #endif