Main Page | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members | Related Pages

dox/Parallel/vtkMultiProcessController.h

Go to the documentation of this file.
00001 /*========================================================================= 00002 00003 Program: Visualization Toolkit 00004 Module: $RCSfile: vtkMultiProcessController.h,v $ 00005 Language: C++ 00006 00007 Copyright (c) 1993-2002 Ken Martin, Will Schroeder, Bill Lorensen 00008 All rights reserved. 00009 See Copyright.txt or http://www.kitware.com/Copyright.htm for details. 00010 00011 This software is distributed WITHOUT ANY WARRANTY; without even 00012 the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR 00013 PURPOSE. See the above copyright notice for more information. 00014 00015 =========================================================================*/ 00049 #ifndef __vtkMultiProcessController_h 00050 #define __vtkMultiProcessController_h 00051 00052 #include "vtkObject.h" 00053 00054 #include "vtkCommunicator.h" // Needed for direct access to communicator 00055 00056 class vtkDataSet; 00057 class vtkImageData; 00058 class vtkCollection; 00059 class vtkOutputWindow; 00060 class vtkDataObject; 00061 class vtkMultiProcessController; 00062 00063 //BTX 00064 // The type of function that gets called when new processes are initiated. 00065 typedef void (*vtkProcessFunctionType)(vtkMultiProcessController *controller, 00066 void *userData); 00067 00068 // The type of function that gets called when an RMI is triggered. 00069 typedef void (*vtkRMIFunctionType)(void *localArg, 00070 void *remoteArg, int remoteArgLength, 00071 int remoteProcessId); 00072 //ETX 00073 00074 00075 class VTK_PARALLEL_EXPORT vtkMultiProcessController : public vtkObject 00076 { 00077 public: 00078 static vtkMultiProcessController *New(); 00079 vtkTypeRevisionMacro(vtkMultiProcessController,vtkObject); 00080 void PrintSelf(ostream& os, vtkIndent indent); 00081 00085 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv))=0; 00086 00088 00091 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv), 00092 int initializedExternally)=0; 00094 00097 virtual void Finalize()=0; 00098 00102 virtual void Finalize(int finalizedExternally)=0; 00103 00105 00108 virtual void SetNumberOfProcesses(int num); 00109 vtkGetMacro( NumberOfProcesses, int ); 00111 00112 //BTX 00114 00117 void SetSingleMethod(vtkProcessFunctionType, void *data); 00118 //ETX 00120 00124 virtual void SingleMethodExecute() = 0; 00125 00126 //BTX 00128 00132 void SetMultipleMethod(int index, vtkProcessFunctionType, void *data); 00133 //ETX 00135 00139 virtual void MultipleMethodExecute() = 0; 00140 00142 virtual int GetLocalProcessId() { return this->LocalProcessId; } 00143 00148 static vtkMultiProcessController *GetGlobalController(); 00149 00152 virtual void CreateOutputWindow() = 0; 00153 00155 00160 vtkSetMacro(ForceDeepCopy, int); 00161 vtkGetMacro(ForceDeepCopy, int); 00162 vtkBooleanMacro(ForceDeepCopy, int); 00164 00165 00166 //------------------ RMIs -------------------- 00167 //BTX 00173 void AddRMI(vtkRMIFunctionType, void *localArg, int tag); 00174 00176 00177 void RemoveRMI(vtkRMIFunctionType f, void *arg, int tag) 00178 {f = f; arg = arg; tag = tag; vtkErrorMacro("RemoveRMI Not Implemented Yet");}; 00179 //ETX 00181 00183 void TriggerRMI(int remoteProcessId, void *arg, int argLength, int tag); 00184 00187 void TriggerBreakRMIs(); 00188 00190 00191 void TriggerRMI(int remoteProcessId, char *arg, int tag) 00192 { this->TriggerRMI(remoteProcessId, (void*)arg, 00193 static_cast<int>(strlen(arg))+1, tag); } 00195 00197 00198 void TriggerRMI(int remoteProcessId, int tag) 00199 { this->TriggerRMI(remoteProcessId, NULL, 0, tag); } 00201 00204 void ProcessRMIs(); 00205 00207 00210 vtkSetMacro(BreakFlag, int); 00211 vtkGetMacro(BreakFlag, int); 00213 00215 vtkGetObjectMacro(Communicator, vtkCommunicator); 00217 00218 //BTX 00219 00220 enum Consts { 00221 MAX_PROCESSES=8192, 00222 ANY_SOURCE=-1, 00223 INVALID_SOURCE=-2, 00224 RMI_TAG=315167, 00225 RMI_ARG_TAG=315168, 00226 BREAK_RMI_TAG=239954 00227 }; 00228 00229 //ETX 00230 00232 virtual void Barrier() = 0; 00233 00234 static void SetGlobalController(vtkMultiProcessController *controller); 00235 00236 //------------------ Communication -------------------- 00237 00239 00241 int Send(int* data, int length, int remoteProcessId, int tag); 00242 int Send(unsigned long* data, int length, int remoteProcessId, 00243 int tag); 00244 int Send(char* data, int length, int remoteProcessId, int tag); 00245 int Send(unsigned char* data, int length, int remoteProcessId, int tag); 00246 int Send(float* data, int length, int remoteProcessId, int tag); 00247 int Send(double* data, int length, int remoteProcessId, int tag); 00248 #ifdef VTK_USE_64BIT_IDS 00249 int Send(vtkIdType* data, int length, int remoteProcessId, int tag); 00251 #endif 00252 int Send(vtkDataObject *data, int remoteId, int tag); 00253 int Send(vtkDataArray *data, int remoteId, int tag); 00254 00256 00259 int Receive(int* data, int length, int remoteProcessId, int tag); 00260 int Receive(unsigned long* data, int length, int remoteProcessId, 00261 int tag); 00262 int Receive(char* data, int length, int remoteProcessId, int tag); 00263 int Receive(unsigned char* data, int length, int remoteProcessId, int tag); 00264 int Receive(float* data, int length, int remoteProcessId, int tag); 00265 int Receive(double* data, int length, int remoteProcessId, int tag); 00266 #ifdef VTK_USE_64BIT_IDS 00267 int Receive(vtkIdType* data, int length, int remoteProcessId, int tag); 00269 #endif 00270 int Receive(vtkDataObject* data, int remoteId, int tag); 00271 int Receive(vtkDataArray* data, int remoteId, int tag); 00272 00273 // Internally implemented RMI to break the process loop. 00274 00275 protected: 00276 vtkMultiProcessController(); 00277 ~vtkMultiProcessController(); 00278 00279 int MaximumNumberOfProcesses; 00280 int NumberOfProcesses; 00281 00282 int LocalProcessId; 00283 00284 vtkProcessFunctionType SingleMethod; 00285 void *SingleData; 00286 vtkProcessFunctionType MultipleMethod[MAX_PROCESSES]; 00287 void *MultipleData[MAX_PROCESSES]; 00288 00289 vtkCollection *RMIs; 00290 00291 // This is a flag that can be used by the ports to break 00292 // their update loop. (same as ProcessRMIs) 00293 int BreakFlag; 00294 00295 void ProcessRMI(int remoteProcessId, void *arg, int argLength, int rmiTag); 00296 00297 // This method implements "GetGlobalController". 00298 // It needs to be virtual and static. 00299 virtual vtkMultiProcessController *GetLocalController(); 00300 00301 00302 // This flag can force deep copies during send. 00303 int ForceDeepCopy; 00304 00305 vtkOutputWindow* OutputWindow; 00306 00307 // Note that since the communicators can be created differently 00308 // depending on the type of controller, the subclasses are 00309 // responsible of deleting them. 00310 vtkCommunicator* Communicator; 00311 00312 // Communicator which is a copy of the current user 00313 // level communicator except the context; i.e. even if the tags 00314 // are the same, the RMI messages will not interfere with user 00315 // level messages. (This only works with MPI. When using threads, 00316 // the tags have to be unique.) 00317 // Note that since the communicators can be created differently 00318 // depending on the type of controller, the subclasses are 00319 // responsible of deleting them. 00320 vtkCommunicator* RMICommunicator; 00321 00322 private: 00323 vtkMultiProcessController(const vtkMultiProcessController&); // Not implemented. 00324 void operator=(const vtkMultiProcessController&); // Not implemented. 00325 }; 00326 00327 00328 inline int vtkMultiProcessController::Send(vtkDataObject *data, 00329 int remoteThreadId, int tag) 00330 { 00331 if (this->Communicator) 00332 { 00333 return this->Communicator->Send(data, remoteThreadId, tag); 00334 } 00335 else 00336 { 00337 return 0; 00338 } 00339 } 00340 00341 inline int vtkMultiProcessController::Send(vtkDataArray *data, 00342 int remoteThreadId, int tag) 00343 { 00344 if (this->Communicator) 00345 { 00346 return this->Communicator->Send(data, remoteThreadId, tag); 00347 } 00348 else 00349 { 00350 return 0; 00351 } 00352 } 00353 00354 inline int vtkMultiProcessController::Send(int* data, int length, 00355 int remoteThreadId, int tag) 00356 { 00357 if (this->Communicator) 00358 { 00359 return this->Communicator->Send(data, length, remoteThreadId, tag); 00360 } 00361 else 00362 { 00363 return 0; 00364 } 00365 } 00366 00367 inline int vtkMultiProcessController::Send(unsigned long* data, 00368 int length, int remoteThreadId, 00369 int tag) 00370 { 00371 if (this->Communicator) 00372 { 00373 return this->Communicator->Send(data, length, remoteThreadId, tag); 00374 } 00375 else 00376 { 00377 return 0; 00378 } 00379 } 00380 00381 inline int vtkMultiProcessController::Send(char* data, int length, 00382 int remoteThreadId, int tag) 00383 { 00384 if (this->Communicator) 00385 { 00386 return this->Communicator->Send(data, length, remoteThreadId, tag); 00387 } 00388 else 00389 { 00390 return 0; 00391 } 00392 } 00393 00394 inline int vtkMultiProcessController::Send(unsigned char* data, int length, 00395 int remoteThreadId, int tag) 00396 { 00397 if (this->Communicator) 00398 { 00399 return this->Communicator->Send(data, length, remoteThreadId, tag); 00400 } 00401 else 00402 { 00403 return 0; 00404 } 00405 } 00406 00407 inline int vtkMultiProcessController::Send(float* data, int length, 00408 int remoteThreadId, int tag) 00409 { 00410 if (this->Communicator) 00411 { 00412 return this->Communicator->Send(data, length, remoteThreadId, tag); 00413 } 00414 else 00415 { 00416 return 0; 00417 } 00418 } 00419 00420 inline int vtkMultiProcessController::Send(double* data, int length, 00421 int remoteThreadId, int tag) 00422 { 00423 if (this->Communicator) 00424 { 00425 return this->Communicator->Send(data, length, remoteThreadId, tag); 00426 } 00427 else 00428 { 00429 return 0; 00430 } 00431 } 00432 00433 #ifdef VTK_USE_64BIT_IDS 00434 inline int vtkMultiProcessController::Send(vtkIdType* data, int length, 00435 int remoteThreadId, int tag) 00436 { 00437 if (this->Communicator) 00438 { 00439 return this->Communicator->Send(data, length, remoteThreadId, tag); 00440 } 00441 else 00442 { 00443 return 0; 00444 } 00445 } 00446 #endif 00447 00448 inline int vtkMultiProcessController::Receive(vtkDataObject* data, 00449 int remoteThreadId, int tag) 00450 { 00451 if (this->Communicator) 00452 { 00453 return this->Communicator->Receive(data, remoteThreadId, tag); 00454 } 00455 else 00456 { 00457 return 0; 00458 } 00459 } 00460 00461 inline int vtkMultiProcessController::Receive(vtkDataArray* data, 00462 int remoteThreadId, int tag) 00463 { 00464 if (this->Communicator) 00465 { 00466 return this->Communicator->Receive(data, remoteThreadId, tag); 00467 } 00468 else 00469 { 00470 return 0; 00471 } 00472 } 00473 00474 inline int vtkMultiProcessController::Receive(int* data, int length, 00475 int remoteThreadId, int tag) 00476 { 00477 if (this->Communicator) 00478 { 00479 return this->Communicator->Receive(data, length, remoteThreadId, tag); 00480 } 00481 else 00482 { 00483 return 0; 00484 } 00485 } 00486 00487 inline int vtkMultiProcessController::Receive(unsigned long* data, 00488 int length,int remoteThreadId, 00489 int tag) 00490 { 00491 if (this->Communicator) 00492 { 00493 return this->Communicator->Receive(data, length, remoteThreadId, tag); 00494 } 00495 else 00496 { 00497 return 0; 00498 } 00499 } 00500 00501 inline int vtkMultiProcessController::Receive(char* data, int length, 00502 int remoteThreadId, int tag) 00503 { 00504 if (this->Communicator) 00505 { 00506 return this->Communicator->Receive(data, length, remoteThreadId, tag); 00507 } 00508 else 00509 { 00510 return 0; 00511 } 00512 } 00513 00514 inline int vtkMultiProcessController::Receive(unsigned char* data, int length, 00515 int remoteThreadId, int tag) 00516 { 00517 if (this->Communicator) 00518 { 00519 return this->Communicator->Receive(data, length, remoteThreadId, tag); 00520 } 00521 else 00522 { 00523 return 0; 00524 } 00525 } 00526 00527 inline int vtkMultiProcessController::Receive(float* data, int length, 00528 int remoteThreadId, int tag) 00529 { 00530 if (this->Communicator) 00531 { 00532 return this->Communicator->Receive(data, length, remoteThreadId, tag); 00533 } 00534 else 00535 { 00536 return 0; 00537 } 00538 } 00539 00540 inline int vtkMultiProcessController::Receive(double* data, int length, 00541 int remoteThreadId, int tag) 00542 { 00543 if (this->Communicator) 00544 { 00545 return this->Communicator->Receive(data, length, remoteThreadId, tag); 00546 } 00547 else 00548 { 00549 return 0; 00550 } 00551 } 00552 00553 #ifdef VTK_USE_64BIT_IDS 00554 inline int vtkMultiProcessController::Receive(vtkIdType* data, int length, 00555 int remoteThreadId, int tag) 00556 { 00557 if (this->Communicator) 00558 { 00559 return this->Communicator->Receive(data, length, remoteThreadId, tag); 00560 } 00561 else 00562 { 00563 return 0; 00564 } 00565 } 00566 #endif 00567 00568 #endif