Task synchronization is traditionally handled by the ProActive Scheduler through Task
Dependencies.
Task Dependencies does not allow fine grain synchronization (for example between
sibling task or across multiple workflows).
Task Synchronization API is the tool to use when complex synchronization patterns
are needed.
Task Synchronization API works as a key-value store organized in channels.
Each channel has a unique identifier name, each channel is a HashMap.
This means that each put operation is identified by a 3-tuple:
Users of the synchronization API can create or remove channels and put key/values inside
channels.
The synchronization service is started with the scheduler automatically.
Channels can be made
persistent (preserved at scheduler restart) or kept in
memory
only.
Each channel is implemented internally as a Java HashMap, all methods defined in the
Java
8 Map interface
can be used, with some signature changes.
Importantly, all
lambda-related methods allow to perform
operations
on the map
instead of simply
replacing entries.
A key point is that all operations on the synchronization API are
atomic. In other words,
two operations coming from two different tasks, do
not run concurrently.
In addition to methods provided in the Map interface,
wait methods are
implemented, this
allows for example to block a task until a certain condition on the map occurs.
When
lambdas are used, they must be passed as
string values defining a
Groovy
closure.
This is to ensure language interoperability (a python script can thus use the
Synchronization API with lambdas).
For example :
{k, x -> x+1}
should be used instead of
(k, x) -> x+1
Documentation :
https://doc.activeeon.com/latest/user/ProActiveUserGuide.html#_task_synchronization_api.
JavaDoc for Synchronization API:
https://doc.activeeon.com/latest/javadoc/org/ow2/proactive/scheduler/synchronization/Synchronization.html.
Let's illustrate this mechanism by a simple workflow. Create an empty workflow and
select the
Key_Value workflow under
Controls->Synchronization API Examples
The implementation code of the
Init task:
// Get the current job id
jobid = variables.get("PA_JOB_ID")
// Create a channel inside the job
synchronizationapi.createChannel(jobid, false)
// Add a lock entry
synchronizationapi.put(jobid, "lock", true)
println "Channel " + jobid + " created."
The selection script of the task
TaskB_Wait:
// Wait until lock is false
selected = !(synchronizationapi.get(variables.get("PA_JOB_ID"), "lock"))
The implementation code of the
TaskA:
println "Sleeping 5 seconds"
Thread.sleep(5000)
println "Unlocking Task B"
synchronizationapi.put(variables.get("PA_JOB_ID"), "lock", false) // set the lock to
false
The implementation of the
Clean:
jobid = variables.get("PA_JOB_ID")
// Delete the channel
synchronizationapi.deleteChannel(jobid )
println "Channel " + jobid + " deleted."
Execute the workflow, observe how tasks are synchronized:
[28t2@trydev.activeeon.com;14:24:22] Channel 28 created.
[28t0@trydev.activeeon.com;14:24:28] Sleeping 5 seconds
[28t0@trydev.activeeon.com;14:24:33] Unlocking Task B
[28t1@trydev.activeeon.com;14:24:40] Task B has been unlocked by Task A
[28t3@trydev.activeeon.com;14:24:46] Channel 28 deleted.
Synchronization API logs are available in the Scheduler Portal, in
Server Logs
panel:
[2018-04-24 14:24:22,685 de72716883 INFO o.o.p.s.u.TaskLogger] task 28t2 (Init)
[Synchronization]Created new memory channel '28'
[2018-04-24 14:24:22,786 de72716883 INFO o.o.p.s.u.TaskLogger] task 28t2 (Init)
[Synchronization][Channel=28] Put true on key 'lock', previous value was null
[2018-04-24 14:24:34,431 de72716883 INFO o.o.p.s.u.TaskLogger] task 28t0 (TaskA)
[Synchronization][Channel=28] Put false on key 'lock', previous value was true
[2018-04-24 14:24:46,882 de72716883 INFO o.o.p.s.u.TaskLogger] task 28t3 (Clean)
[Synchronization]Deleted memory channel '28'