9.3 Thread communication
9.3.1 Message queues
Prolog threads can exchange data using dynamic predicates, database records, and other globally shared data. These provide no suitable means to wait for data or a condition as they can only be checked in an expensive polling loop. Message queues provide a means for threads to wait for data or conditions without using the CPU.
Each thread has a message queue attached to it that is identified by the thread. Additional queues are created using message_queue_create/1. Explicitly created queues come in two flavours. When given an alias, they must be destroyed by the user. Anonymous message queues are identified by a blob (see section 11.4.7) and subject to garbage collection.
- thread_send_message(+QueueOrThreadId, +Term)
- Place Term in the given queue or default queue of the
indicated thread (which can even be the message queue of itself, see
thread_self/1).
Any term can be placed in a message queue, but note that the term is
copied to the receiving thread and variable bindings are thus lost. This
call returns immediately.
If more than one thread is waiting for messages on the given queue and at least one of these is waiting with a partially instantiated Term, the waiting threads are all sent a wake-up signal, starting a rush for the available messages in the queue. This behaviour can seriously harm performance with many threads waiting on the same queue as all-but-the-winner perform a useless scan of the queue. If there is only one waiting thread or all waiting threads wait with an unbound variable, an arbitrary thread is restarted to scan the queue.160See the documentation for the POSIX thread functions pthread_cond_signal() v.s. pthread_cond_broadcast() for background information.
- [semidet]thread_send_message(+Queue, +Term, +Options)
- As thread_send_message/2,
but providing additional Options. These are to deal with the
case that the queue has a finite maximum size and is full: whereas thread_send_message/2
will block until the queue has drained sufficiently to accept a new
message, thread_send_message/3
can accept a time-out or deadline analogously to thread_get_message/3.
The options are:
- deadline(+AbsTime)
- The call fails (silently) if no space has become available before AbsTime. See get_time/1 for the representation of absolute time. If AbsTime is earlier then the current time, thread_send_message/3 fails immediately. Both resolution and maximum wait time is platform-dependent.161The implementation uses MsgWaitForMultipleObjects() on MS-Windows and pthread_cond_timedwait() on other systems.
- timeout(+Time)
- Time is a float or integer and specifies the maximum time to
wait in seconds. This is a relative-time version of the
deadline
option. If both options are provided, the earlier time is effective.If Time is 0 or 0.0, thread_send_message/3 examines the queue and sends the message if space is availabel, but does not suspend if no space is available, failing immediately instead.
If Time < 0, thread_send_message/3 fails immediately without sending the message.
- thread_get_message(?Term)
- Examines the thread message queue and if necessary blocks execution
until a term that unifies to Term arrives in the queue. After
a term from the queue has been unified to Term, the term is
deleted from the queue.
Please note that non-unifying messages remain in the queue. After the following has been executed, thread 1 has the term
b(gnu)
in its queue and continues execution using A =gnat
.<thread 1> thread_get_message(a(A)), <thread 2> thread_send_message(Thread_1, b(gnu)), thread_send_message(Thread_1, a(gnat)),
See also thread_peek_message/1.
- thread_peek_message(?Term)
- Examines the thread message queue and compares the queued terms with Term until one unifies or the end of the queue has been reached. In the first case the call succeeds, possibly instantiating Term. If no term from the queue unifies, this call fails. I.e., thread_peek_message/1 never waits and does not remove any term from the queue. See also thread_get_message/3.
- message_queue_create(?Queue)
- Equivalent to
message_queue_create(Queue,[])
. For compatibility, callingmessage_queue_create(+Atom)
is equivalent tomessage_queue_create(Queue, [alias(Atom)])
. New code should use message_queue_create/2 to create a named queue. - message_queue_create(-Queue, +Options)
- Create a message queue from Options. Defined options are:
- alias(+Alias)
- Create a message queue that is identified by the atom Alias. Message queues created this way must be explicitly destroyed by the user. If the alias option is omitted, an Anonymous queue is created that is indentified by a blob (see section 11.4.7) and subject to garbage collection.162Garbage collecting anonymous message queues is not part of the ISO proposal and most likely not a widely implemented feature.
- max_size(+Size)
- Maximum number of terms in the queue. If this number is reached, thread_send_message/2 will suspend until the queue is drained. The option can be used if the source, sending messages to the queue, is faster than the drain, consuming the messages.
- [det]message_queue_destroy(+Queue)
- Destroy a message queue created with message_queue_create/1. A permission error is raised if Queue refers to (the default queue of) a thread. Other threads that are waiting for Queue using thread_get_message/2 receive an existence error.
- [det]thread_get_message(+Queue, ?Term)
- As thread_get_message/1, operating on a given queue. It is allowed (but not advised) to get messages from the queue of other threads. This predicate raises an existence error exception if Queue doesn't exist or is destroyed using message_queue_destroy/1 while this predicate is waiting.
- [semidet]thread_get_message(+Queue, ?Term, +Options)
- As thread_get_message/2,
but providing additional Options:
- deadline(+AbsTime)
- The call fails (silently) if no message has arrived before AbsTime. See get_time/1 for the representation of absolute time. If AbsTime is earlier then the current time, thread_get_message/3 fails immediately. Both resolution and maximum wait time is platform-dependent.163The implementation uses MsgWaitForMultipleObjects() on MS-Windows and pthread_cond_timedwait() on other systems.
- timeout(+Time)
- Time is a float or integer and specifies the maximum time to
wait in seconds. This is a relative-time version of the
deadline
option. If both options are provided, the earlier time is effective.If Time is 0 or 0.0, thread_get_message/3 examines the queue but does not suspend if no matching term is available. Note that unlike thread_peek_message/2, a matching term is removed from the queue.
If Time < 0, thread_get_message/3 fails immediately without removing any message from the queue.
- [semidet]thread_peek_message(+Queue, ?Term)
- As thread_peek_message/1, operating on a given queue. It is allowed to peek into another thread's message queue, an operation that can be used to check whether a thread has swallowed a message sent to it.
- message_queue_property(?Queue, ?Property)
- True if Property is a property of Queue. Defined
properties are:
- alias(Alias)
- Queue has the given alias name.
- max_size(Size)
- Maximum number of terms that can be in the queue. See message_queue_create/2. This property is not present if there is no limit (default).
- size(Size)
- Queue currently contains Size terms. Note that due to concurrent access the returned value may be outdated before it is returned. It can be used for debugging purposes as well as work distribution purposes.
The
size(Size)
property is always present and may be used to enumerate the created message queues. Note that this predicate does not enumerate threads, but can be used to query the properties of the default queue of a thread.
Explicit message queues are designed with the worker-pool model in mind, where multiple threads wait on a single queue and pick up the first goal to execute. Below is a simple implementation where the workers execute arbitrary Prolog goals. Note that this example provides no means to tell when all work is done. This must be realised using additional synchronisation.
%% create_workers(?Id, +N) % % Create a pool with Id and number of workers. % After the pool is created, post_job/1 can be used to % send jobs to the pool. create_workers(Id, N) :- message_queue_create(Id), forall(between(1, N, _), thread_create(do_work(Id), _, [])). do_work(Id) :- repeat, thread_get_message(Id, Goal), ( catch(Goal, E, print_message(error, E)) -> true ; print_message(error, goal_failed(Goal, worker(Id))) ), fail. %% post_job(+Id, +Goal) % % Post a job to be executed by one of the pool's workers. post_job(Id, Goal) :- thread_send_message(Id, Goal).
9.3.2 Signalling threads
These predicates provide a mechanism to make another thread execute some goal as an interrupt. Signalling threads is safe as these interrupts are only checked at safe points in the virtual machine. Nevertheless, signalling in multithreaded environments should be handled with care as the receiving thread may hold a mutex (see with_mutex/2). Signalling probably only makes sense to start debugging threads and to cancel no-longer-needed threads with throw/1, where the receiving thread should be designed carefully to handle exceptions at any point.
- thread_signal(+ThreadId, :Goal)
- Make thread ThreadId execute Goal at the first
opportunity. In the current implementation, this implies at the first
pass through the Call port. The predicate thread_signal/2
itself places Goal into the signalled thread's signal queue
and returns immediately.
Signals (interrupts) do not cooperate well with the world of multithreading, mainly because the status of mutexes cannot be guaranteed easily. At the call port, the Prolog virtual machine holds no locks and therefore the asynchronous execution is safe.
Goal can be any valid Prolog goal, including throw/1 to make the receiving thread generate an exception, and trace/0 to start tracing the receiving thread.
In the Windows version, the receiving thread immediately executes the signal if it reaches a Windows GetMessage() call, which generally happens if the thread is waiting for (user) input.
9.3.3 Threads and dynamic predicates
Besides queues (section 9.3.1) threads can share and exchange data using dynamic predicates. The multithreaded version knows about two types of dynamic predicates. By default, a predicate declared dynamic (see dynamic/1) is shared by all threads. Each thread may assert, retract and run the dynamic predicate. Synchronisation inside Prolog guarantees the consistency of the predicate. Updates are logical: visible clauses are not affected by assert/retract after a query started on the predicate. In many cases primitives from section 9.4 should be used to ensure that application invariants on the predicate are maintained.
Besides shared predicates, dynamic predicates can be declared with the thread_local/1 directive. Such predicates share their attributes, but the clause list is different in each thread.
- thread_local +Functor/+Arity, ...
- This directive is related to the dynamic/1
directive. It tells the system that the predicate may be modified using assert/1, retract/1,
etc., during execution of the program. Unlike normal shared dynamic
data, however, each thread has its own clause list for the predicate. As
a thread starts, this clause list is empty. If there are still clauses
when the thread terminates, these are automatically reclaimed by the
system (see also volatile/1).
The thread_local property implies the properties dynamic and volatile.
Thread-local dynamic predicates are intended for maintaining thread-specific state or intermediate results of a computation.
It is not recommended to put clauses for a thread-local predicate into a file, as in the example below, because the clause is only visible from the thread that loaded the source file. All other threads start with an empty clause list.
:- thread_local foo/1. foo(gnat).
DISCLAIMER Whether or not this declaration is appropriate in the sense of the proper mechanism to reach the goal is still debated. If you have strong feelings in favour or against, please share them in the SWI-Prolog mailing list.