This module defines mpirun(), a parallel implementation of run() using a distributed memory approach. Message passing is done with mpi4py mainly, however some messages are also handled in c++ (with openmpi).

Note

## Logic:¶

The logic for an initially centralized scene is as follows:

1. Instanciate a complete, ordinary, yade scene

2. Insert subdomains as special yade bodies. This is somehow similar to adding a clump body on the top of clump members

3. Broadcast this scene to all workers. In the initialization phase the workers will:

• define the bounding box of their assigned bodies and return it to other workers
• detect which assigned bodies are virtually in interaction with other domains (based on their bounding boxes) and communicate the lists to the relevant workers
• erase the bodies which are neither assigned nor virtually interacting with the subdomain
4. Run a number of ‘regular’ iterations without re-running collision detection (verlet dist mechanism). In each regular iteration the workers will:

• calculate internal and cross-domains interactions
• execute Newton on assigned bodies (modified Newton skips other domains)
• send updated positions to other workers and partial force on floor to master
5. When one worker triggers collision detection all workers will follow. It will result in updating the intersections between subdomains.

6. If enabled, bodies may be re-allocated to different domains just after a collision detection, based on a filter. Custom filters are possible. One is predidefined here (medianFilter)

## Rules:¶

#- intersections[0] has 0-bodies (to which we need to send force) #- intersections[thisDomain] has ids of the other domains overlapping the current ones #- intersections[otherDomain] has ids of bodies in _current_ domain which are overlapping with other domain (for which we need to send updated pos/vel)

## Hints:¶

#- handle subD.intersections with care (same for mirrorIntersections). subD.intersections.append() will not reach the c++ object. subD.intersections can only be assigned (a list of list of int)
yade.mpy.MAX_RANK_OUTPUT = 5

larger ranks will be skipped in mprint

yade.mpy.REALLOCATE_FILTER(i, j, giveAway)

Returns bodies in “i” to be assigned to “j” based on median split between the center points of subdomain’s AABBs If giveAway!=0, positive or negative, “i” will give/acquire this number to “j” with nothing in return (for load balancing purposes)

class yade.mpy.Timing_comm(inherits object)[source]
Allgather(timing_name, *args, **kwargs)[source]
Gather(timing_name, *args, **kwargs)[source]
Gatherv(timing_name, *args, **kwargs)[source]
allreduce(timing_name, *args, **kwargs)[source]
bcast(timing_name, *args, **kwargs)[source]
clear()[source]
enable_timing(comm_function)[source]
mpiSendStates(timing_name, *args, **kwargs)[source]
mpiWait(timing_name, *args, **kwargs)[source]
mpiWaitReceived(timing_name, *args, **kwargs)[source]
print_all()[source]
recv(timing_name, *args, **kwargs)[source]
send(timing_name, *args, **kwargs)[source]
yade.mpy.bodyErase(ids)[source]

The parallel version of O.bodies.erase(id), should be called collectively else the distributed scenes become inconsistent with each other (even the subdomains which don’t have ‘id’ can call safely). For performance, better call on a list: bodyErase([i,j,k]).

yade.mpy.checkAndCollide()[source]

return true if collision detection needs activation in at least one SD, else false. If COPY_MIRROR_BODIES_WHEN_COLLIDE run collider when needed, and in that case return False.

yade.mpy.colorDomains()[source]

Apply color to body to reflect their subdomain idx

yade.mpy.configure()[source]

Import MPI and define context, configure will no spawn workers by itself, that is done by initialize() openmpi environment variables needs to be set before calling configure()

yade.mpy.declareMasterInteractive()[source]

This is to signal that we are in interactive session, so TIMEOUT will be reset to 0 (ignored)

yade.mpy.disconnect()[source]

Kill all mpi processes, leaving python interpreter to rank 0 as in single-threaded execution. The scenes in workers are lost since further reconnexion to mpi will just spawn new processes. The scene in master thread is left unchanged.

yade.mpy.eraseRemote()[source]
yade.mpy.genLocalIntersections(subdomains)[source]

Defines sets of bodies within current domain overlapping with other domains.

The structure of the data for domain ‘k’ is: [[id1, id2, …], <———– intersections[0] = ids of bodies in domain k interacting with master domain (subdomain k itself excluded) [id3, id4, …], <———– intersections[1] = ids of bodies in domain k interacting with domain rank=1 (subdomain k itself excluded) … [domain1, domain2, domain3, …], <———- intersections[k] = ranks (not ids!) of external domains interacting with domain k … ]

yade.mpy.genUpdatedStates(b_ids)[source]

return list of [id,state] (or [id,state,shape] conditionnaly) to be sent to other workers

yade.mpy.initialize(np)[source]
yade.mpy.isendRecvForces()[source]

Communicate forces from subdomain to master Warning: the sending sides (everyone but master) must wait() the returned list of requests

yade.mpy.makeColorScale(n=None)[source]
yade.mpy.makeMpiArgv()[source]
yade.mpy.maskedConnection(b, boolArray)[source]

List bodies within a facet selectively, the ones marked ‘True’ in boolArray (i.e. already selected from another facet) are discarded

yade.mpy.maskedPFacet(b, boolArray)[source]

List bodies within a facet selectively, the ones marked ‘True’ in boolArray (i.e. already selected from another facet) are discarded

yade.mpy.medianFilter(i, j, giveAway)[source]

Returns bodies in “i” to be assigned to “j” based on median split between the center points of subdomain’s AABBs If giveAway!=0, positive or negative, “i” will give/acquire this number to “j” with nothing in return (for load balancing purposes)

yade.mpy.mergeScene()[source]
yade.mpy.migrateBodies(ids, origin, destination)[source]

Reassign bodies from origin to destination. The function has to be called by both origin (send) and destination (recv). Note: subD.completeSendBodies() will have to be called after a series of reassignement since subD.sendBodies() is non-blocking

yade.mpy.mpiStats()[source]
yade.mpy.mpirun(nSteps, np=None, withMerge=False)[source]

Parallel version of O.run() using MPI domain decomposition.

Parameters

nSteps : The numer of steps to compute np : number of mpi workers (master+subdomains), if=1 the function fallback to O.run() withMerge : wether subdomains should be merged into master at the end of the run (default False). If True the scene in the master process is exactly in the same state as after O.run(nSteps,True). The merge can be time consumming, it is recommended to activate only if post-processing or other similar tasks require it.

yade.mpy.mprint(*args, force=False)[source]

Print with rank-reflecting color regardless of mpy.VERBOSE_OUTPUT, still limited to rank<=mpy.MAX_RANK_OUTPUT

yade.mpy.pairOp(talkTo)[source]
yade.mpy.parallelCollide()[source]
yade.mpy.probeRecvMessage(source, tag)[source]
yade.mpy.projectedBounds(i, j)[source]

Returns sorted list of projections of bounds on a given axis, with bounds taken in i->j and j->i intersections

yade.mpy.reallocateBodiesPairWiseBlocking(_filter, otherDomain)[source]

Re-assign bodies from/to otherDomain based on ‘_filter’ argument. Requirement: ‘_filter’ is a function taking ranks of origin and destination and returning the list of bodies (by index) to be moved. That’s where the decomposition strategy is defined. See example medianFilter (used by default).

yade.mpy.reallocateBodiesToSubdomains(_filter=<function medianFilter>, blocking=True)[source]

Re-assign bodies to subdomains based on ‘_filter’ argument. Requirement: ‘_filter’ is a function taking ranks of origin and destination and returning the list of bodies (by index) to be moved. That’s where the decomposition strategy is defined. See example medianFilter (used by default). This function must be called in parallel, hence if ran interactively the command needs to be sent explicitely: mp.sendCommand(“all”,”reallocateBodiesToSubdomains(medianFilter)”,True)

yade.mpy.reboundRemoteBodies(ids)[source]

update states of bodies handled by other workers, argument ‘states’ is a list of [id,state] (or [id,state,shape] conditionnaly)

yade.mpy.receiveForces(subdomains)[source]

Accumulate forces from subdomains (only executed by master process), should happen after ForceResetter but before Newton and before any other force-dependent engine (e.g. StressController), could be inserted via yade’s pyRunner.

yade.mpy.recordMpiTiming(name, val)[source]

append val to a list of values defined by ‘name’ in the dictionnary timing.mpi

yade.mpy.runOnSynchronouslPairs(workers, command)[source]

Locally (from one worker POV), this function runs interactive mpi tasks defined by ‘command’ on a list of other workers (typically the list of interacting subdomains). Overall, peer-to-peer connexions are established so so that ‘command’ is executed symmetrically and simultaneously on both sides of each worker pair. I.e. if worker “i” executes “command” with argument “j” (index of another worker), then by design “j” will execute the same thing with argument “i” simultaneously.

In many cases a similar series of data exchanges can be obtained more simply (and fastly) with asynchronous irecv+send like below.

for w in workers:
m=comm.irecv(w) comm.send(data,dest=w)

The above only works if the messages are all known in advance locally, before any communication. If the interaction with workers[1] depends on the result of a previous interaction with workers[0] OTOH, it needs synchronous execution, hence this function. Synchronicity is also required if more than one blocking call is present in ‘command’, else an obvious deadlock as if ‘irecv’ was replaced by ‘recv’ in that naive loop. Both cases occur with the ‘medianFilter’ algorithm, hence why we need this synchronous method.

In this function pair connexions are established by the workers in a non-supervized and non-deterministic manner. Each time an interactive communication (i,j) is established ‘command’ is executed simultaneously by i and j. It is guaranted that all possible pairs are visited.

The function can be used for all-to-all operations (N^2 pairs), but more interestingly it works with workers=intersections[rank] (O(N) pairs). It can be tested with the dummy funtion ‘pairOp’: runOnSynchronouslPairs(range(numThreads),pairOp)

command:
a function taking index of another worker as argument, can include blocking communications with the other worker since runOnSynchronouslPairs guarantee that the other worker will be running the command symmetrically.
yade.mpy.sendCommand(executors, command, wait=True, workerToWorker=False)[source]

Send a command to a worker (or list of) from master or from another worker. Accepted executors are “i”, “[i,j,k]”, “slaves”, “all” (then even master will execute the command).

yade.mpy.sendRecvStates()[source]
yade.mpy.shrinkIntersections()[source]

Reduce intersections and mirrorIntersections to bodies effectively interacting with another statefull body form current subdomain This will reduce the number of updates in sendRecvStates Initial lists are backed-up and need to be restored (and all states updated) before collision detection (see checkAndCollide())

yade.mpy.spawnedProcessWaitCommand()[source]
yade.mpy.splitScene()[source]

Split a monolithic scene into distributed scenes on threads.

Precondition: the bodies have subdomain no. set in user script

yade.mpy.unboundRemoteBodies()[source]

Turn bounding boxes on/off depending on rank

yade.mpy.updateAllIntersections()[source]
yade.mpy.updateDomainBounds(subdomains)[source]

yade.mpy.updateMirrorOwners()[source]
yade.mpy.updateRemoteStates(states, setBounded=False)[source]
yade.mpy.waitForces()[source]
yade.mpy.wprint(*args)[source]