This repository has been archived on 2022-08-10. You can view files and clone it, but cannot push or open issues or pull requests.
chez-openbsd/csug/threads.stex

968 lines
36 KiB
Text
Raw Permalink Normal View History

2022-07-29 15:12:07 +02:00
% Copyright 2005-2017 Cisco Systems, Inc.
%
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
% You may obtain a copy of the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS,
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
% See the License for the specific language governing permissions and
% limitations under the License.
\chapter{Thread System\label{CHPTTHREADS}}
\index{threads}This chapter describes the \emph{Chez Scheme} thread-system procedures
and syntactic forms.
With the exception of locks, locked increment, and locked decrement,
the features of the thread system are implemented on top of the Posix
thread system (pthreads) on non-Windows-based system and directly using
the Windows API on Windows-based systems.
Consult the appropriate documentation on your system for basic details
of thread creation and interaction.
Most primitive Scheme procedures are \index{thread-safe primitives}\emph{thread-safe}, meaning
that they can be called concurrently from multiple threads.
This includes allocation operations like \var{cons} and \scheme{make-string},
accessors like \scheme{car} and \scheme{vector-ref},
numeric operators like \scheme{+} and \scheme{sqrt}, and nondestructive
higher-level primitive operators like \scheme{append} and \scheme{map}.
Simple mutation operators, like \scheme{set-car!}, \scheme{vector-set!},
and record field mutators are thread-safe.
Likewise, assignments to local variables, including assignments to
(unexported) library and top-level program variables are thread-safe.
Other destructive operators are thread safe only if they are used to
operate on different objects from those being read or modified by other
threads.
For example, assignments to global variables are thread-safe only as
long as one thread does not assign the same variable another thread
references or assigns.
Similarly, \scheme{putprop} can be called in one thread while another
concurrently calls \scheme{putprop} or \scheme{getprop} if the symbols
whose property lists are being modified or accessed differ.
In this context, most I/O operations should be considered destructive,
since they might modify a port's internal structure; see also
Section~\ref{SECTTHREADSBUFFEREDIO} for information on buffered ports.
Use of operators that are not thread-safe without proper synchronization
can corrupt the objects upon which they operate.
This corruption can lead to incorrect behavior, memory faults, and even
unrecoverable errors that cause the system to abort.
The compiler and interpreter are thread-safe to the extent that user code
evaluated during the compilation and evaluation process is thread-safe or
properly synchronized.
Thus, two or more threads
can call any of the compiler or interpreter entry points, i.e.,
\scheme{compile}, \scheme{compile-file}, \scheme{compile-program}, \scheme{compile-script},
\scheme{compile-port}, or \scheme{interpret} at the same time.
Naturally, the object-file targets of two file compilation operations that
run at the same time should be different.
The same is true for \scheme{eval} and \scheme{load} as long as
the default evaluator is used or is set explicitly to \scheme{compile},
\scheme{interpret}, or some other thread-safe evaluator.
One restriction should be observed when one of multiple threads creates or
loads compiled code, however, which is that only that thread or
subsequently created children, or children of subsequently created
children, etc., should run the code.
This is because multiple-processor systems upon which threaded code may
run might not guarantee that the data and instruction caches are
synchronized across processors.
\section{Thread Creation}
%----------------------------------------------------------------------------
\noskipentryheader
\formdef{fork-thread}{\categoryprocedure}{(fork-thread \var{thunk})}
\returns a thread object
\listlibraries
\endnoskipentryheader
\noindent
\var{thunk} must be a procedure that accepts zero arguments.
\scheme{fork-thread} invokes \var{thunk} in a new thread and returns
a thread object.
Nothing can be done with the thread object returned by
\scheme{fork-thread}, other than to print it.
Threads created by foreign code using some means other than
\scheme{fork-thread} must call \scheme{Sactivate_thread}
(Section~\ref{SECTFOREIGNCLIB}) before touching any Scheme data
or calling any Scheme procedures.
%----------------------------------------------------------------------------
\entryheader
\formdef{thread?}{\categoryprocedure}{(thread? \var{obj})}
\returns \scheme{#t} if \var{obj} is a thread object, \scheme{#f} otherwise
\listlibraries
\endentryheader
%----------------------------------------------------------------------------
\entryheader
\formdef{get-thread-id}{\categoryprocedure}{(get-thread-id)}
\returns the thread id of the current thread
\listlibraries
\endentryheader
The thread id is a thread number assigned by thread id, and has no
relationship to the process id returned by
\index{\scheme{get-process-id}}\scheme{get-process-id}, which is the same
in all threads.
\section{Mutexes}
%----------------------------------------------------------------------------
\noskipentryheader
\formdef{make-mutex}{\categoryprocedure}{(make-mutex)}
\formdef{make-mutex}{\categoryprocedure}{(make-mutex \var{name})}
\returns a new mutex object
\listlibraries
\endnoskipentryheader
\noindent
\var{name}, if supplied, must be a symbol which identifies the mutex, or
\scheme{#f} for no name. The name is printed every time the mutex is
printed, which is useful for debugging.
%----------------------------------------------------------------------------
\entryheader
\formdef{mutex?}{\categoryprocedure}{(mutex? \var{obj})}
\returns \scheme{#t} if \var{obj} is a mutex, \scheme{#f} otherwise
\listlibraries
\endentryheader
%----------------------------------------------------------------------------
\entryheader
\formdef{mutex-acquire}{\categoryprocedure}{(mutex-acquire \var{mutex})}
\formdef{mutex-acquire}{\categoryprocedure}{(mutex-acquire \var{mutex} \var{block?})}
\returns see below
\listlibraries
\endentryheader
\noindent
\var{mutex} must be a mutex.
\var{mutex-acquire} acquires the mutex identified by \var{mutex}.
The optional boolean argument \var{block?} defaults to
\scheme{#t} and specifies whether the thread should block
waiting for the mutex.
If \var{block?} is omitted or is true, the thread
blocks until the mutex has been acquired, and an unspecified
value is returned.
If \scheme{block?} is false and the mutex currently belongs
to a different thread, the current thread does not block.
Instead, \scheme{mutex-acquire} returns
immediately with the value \scheme{#f} to
indicate that the mutex is not available.
If \var{block?} is false and the mutex is successfully
acquired, \scheme{mutex-acquire} returns \scheme{#t}.
Mutexes are \emph{recursive} in Posix threads terminology, which
means that the calling thread can use \scheme{mutex-acquire} to
(re)acquire a mutex it already has.
In this case, an equal number of \scheme{mutex-release} calls
is necessary to release the mutex.
%----------------------------------------------------------------------------
\entryheader
\formdef{mutex-release}{\categoryprocedure}{(mutex-release \var{mutex})}
\returns unspecified
\listlibraries
\endentryheader
\noindent
\var{mutex} must be a mutex.
\scheme{mutex-release} releases the mutex identified by \var{mutex}.
Unpredictable behavior results if the mutex is not owned by the
calling thread.
%----------------------------------------------------------------------------
\entryheader
\formdef{with-mutex}{\categorysyntax}{(with-mutex \var{mutex} \var{body_1} \var{body_2} \dots)}
\returns the values of the body \scheme{\var{body_1} \var{body_2} \dots}
\listlibraries
\endentryheader
\noindent
\scheme{with-mutex} evaluates the expression \var{mutex}, which must
evaluate to a mutex, acquires the mutex, evaluates the body
\scheme{\var{body_1} \var{body_2} \dots}, and releases the mutex.
The mutex is released whether the body returns normally or
via a control operation (that is, throw to a continuation, perhaps because
of an error) that results in
a nonlocal exit from the \scheme{with-mutex} form.
If control subsequently returns to the body via a
continuation invocation, the mutex is reacquired.
Using \scheme{with-mutex} is generally more convenient and safer than using
\scheme{mutex-acquire} and \scheme{mutex-release} directly.
%----------------------------------------------------------------------------
\entryheader
\formdef{mutex-name}{\categoryprocedure}{(mutex-name \var{mutex})}
\returns the name associated with \var{mutex}, if any; otherwise \scheme{#f}
\listlibraries
\endentryheader
\noindent
\var{mutex} must be a mutex.
\section{Conditions}
%----------------------------------------------------------------------------
\noskipentryheader
\formdef{make-condition}{\categoryprocedure}{(make-condition)}
\formdef{make-condition}{\categoryprocedure}{(make-condition \var{name})}
\returns a new condition object
\listlibraries
\endnoskipentryheader
\noindent
\var{name}, if supplied, must be a symbol which identifies the condition
object, or \scheme{#f} for no name. The name is printed every time the
condition is printed, which is useful for debugging.
%----------------------------------------------------------------------------
\entryheader
\formdef{thread-condition?}{\categoryprocedure}{(thread-condition? \var{obj})}
\returns \scheme{#t} if \var{obj} is a condition object, \scheme{#f} otherwise
\listlibraries
\endentryheader
%----------------------------------------------------------------------------
\entryheader
\formdef{condition-wait}{\categoryprocedure}{(condition-wait \var{cond} \var{mutex})}
\formdef{condition-wait}{\categoryprocedure}{(condition-wait \var{cond} \var{mutex} \var{timeout})}
\returns \scheme{#t} if the calling thread was awakened by the condition, \scheme{#f} if the calling thread timed out waiting
\listlibraries
\endentryheader
\noindent
\var{cond} must be a condition object, and
\var{mutex} must be a mutex.
The optional argument \var{timeout} is a time record of type
\scheme{time-duration} or \scheme{time-utc}, or \scheme{#f} for no
timeout. It defaults to \scheme{#f}.
\scheme{condition-wait} waits up to the specified \var{timeout} for
the condition identified by the condition object \var{cond}.
The calling thread must have acquired the mutex identified by the mutex
\var{mutex} at the time \scheme{condition-wait} is
called.
\var{mutex} is released as a side effect of the call to
\scheme{condition-wait}.
When a thread is later released from the condition variable by one of
the procedures described below or the timeout expires, \var{mutex} is
reacquired and \scheme{condition-wait} returns.
%----------------------------------------------------------------------------
\entryheader
\formdef{condition-signal}{\categoryprocedure}{(condition-signal \var{cond})}
\returns unspecified
\listlibraries
\endentryheader
\noindent
\var{cond} must be a condition object.
\scheme{condition-signal} releases one of the threads waiting for the
condition identified by \var{cond}.
%----------------------------------------------------------------------------
\entryheader
\formdef{condition-broadcast}{\categoryprocedure}{(condition-broadcast \var{cond})}
\returns unspecified
\listlibraries
\endentryheader
\noindent
\var{cond} must be a condition object.
\scheme{condition-broadcast} releases all of the threads waiting for the
condition identified by \var{cond}.
%----------------------------------------------------------------------------
\entryheader
\formdef{condition-name}{\categoryprocedure}{(condition-name \var{condition})}
\returns the name associated with \var{condition}, if any; otherwise \scheme{#f}
\listlibraries
\endentryheader
\noindent
\var{condition} must be a condition.
\section{Locks\label{SECTTHREADLOCKS}}
\index{locks}%
Locks are more primitive but more flexible and efficient than mutexes
and can be used in situations where the added mutex functionality
is not needed or desired.
They can also be used independently of the thread system
(including in nonthreaded versions of {\ChezScheme})
to synchronize operations running in separate Scheme processes
as long as the lock is allocated in memory shared by the processes.
A lock is simply a word-sized integer, i.e., an \scheme{iptr} or
\scheme{uptr} foreign type (Section~\ref{SECTFOREIGNDATA}) with the native
endianness of the target machine, possibly part of a larger structure
defined using \scheme{define-ftype} (page~\pageref{defn:define-ftype}).
It must be explicitly allocated in memory that resides outside the Scheme
heap and, when appropriate, explicitly deallocated.
When just threads are involved (i.e., when multiple processes are not
involved), the memory can be allocated via \scheme{foreign-alloc}.
When multiple processes are involved, the lock should be allocated in
some area shared by the processes that will interact with the lock.
Once initialized using \scheme{ftype-init-lock!}, a process or thread
can attempt to lock the lock via \scheme{ftype-lock!} or \scheme{ftype-spin-lock!}.
Once the lock has been locked and before it is unlocked, further
attempts to lock the lock fail, even by the process or thread that
most recently locked it.
Locks can be unlocked, via \scheme{ftype-unlock!}, by any process or thread,
not just by the process or thread that most recently locked the lock.
The lock mechanism provides little structure, and mistakes
in allocation and use can lead to memory faults, deadlocks,
and other problems.
Thus, it is usually advisable to use locks only as part of a
higher-level abstraction that ensures locks are used in a
disciplined manner.
\schemedisplay
(define lock
(make-ftype-pointer uptr
(foreign-alloc (ftype-sizeof uptr))))
(ftype-init-lock! uptr () lock)
(ftype-lock! uptr () lock) ;=> #t
(ftype-lock! uptr () lock) ;=> #f
(ftype-unlock! uptr () lock)
(ftype-spin-lock! uptr () lock)
(ftype-lock! uptr () lock) ;=> #f
(ftype-unlock! uptr () lock)
\endschemedisplay
\entryheader
\formdef{ftype-init-lock!}{\categorysyntax}{(ftype-init-lock! \var{ftype-name} (\var{a} ...) \var{fptr-expr})}
\formdef{ftype-init-lock!}{\categorysyntax}{(ftype-init-lock! \var{ftype-name} (\var{a} ...) \var{fptr-expr} \var{index})}
\returns unspecified
\formdef{ftype-lock!}{\categorysyntax}{(ftype-lock! \var{ftype-name} (\var{a} ...) \var{fptr-expr})}
\formdef{ftype-lock!}{\categorysyntax}{(ftype-lock! \var{ftype-name} (\var{a} ...) \var{fptr-expr} \var{index})}
\returns \scheme{#t} if the lock is not already locked, \scheme{#f} otherwise
\formdef{ftype-spin-lock!}{\categorysyntax}{(ftype-spin-lock! \var{ftype-name} (\var{a} ...) \var{fptr-expr})}
\formdef{ftype-spin-lock!}{\categorysyntax}{(ftype-spin-lock! \var{ftype-name} (\var{a} ...) \var{fptr-expr} \var{index})}
\returns unspecified
\formdef{ftype-unlock!}{\categorysyntax}{(ftype-unlock! \var{ftype-name} (\var{a} ...) \var{fptr-expr})}
\formdef{ftype-unlock!}{\categorysyntax}{(ftype-unlock! \var{ftype-name} (\var{a} ...) \var{fptr-expr} \var{index})}
\returns unspecified
\listlibraries
\endentryheader
Each of these has a syntax like and behaves similarly to
\scheme{ftype-set!} (page~\pageref{defn:ftype-set!}), though with an implicit
\var{val-expr}.
In particular, the restrictions on and handling of \var{fptr-expr}
and the accessors \scheme{\var{a} \dots} is similar, with one important
restriction: the field specified by the last accessor, upon which
the form operates, must be a word-size integer, i.e., an
\scheme{iptr}, \scheme{uptr}, or the equivalent, with the native
endianness.
\scheme{ftype-init-lock!} should be used to initialize the lock prior
to the use of any of the other operators; if this is not done, the
behavior of the other operators is undefined.
\scheme{ftype-lock!} can be used to lock the lock.
If it finds the lock unlocked at the time of the operation, it locks
the lock and returns \scheme{#t}; if it finds the lock already locked,
it returns \scheme{#f} without changing the lock.
\scheme{ftype-spin-lock!} can also be used to lock the lock.
If it finds the lock unlocked at the time of the operation, it locks the
lock and returns; if it finds the lock already locked, it waits until
the lock is unlocked, then locks the lock and returns.
If no other thread or process unlocks the lock, the operation does
not return and cannot be interrupted by normal means, including by the
storage manager for the purpose of initiating a garbage collection.
There are also no guarantees of fairness, so a process might hang
indefinitely even if other processes are actively locking and unlocking
the lock.
\scheme{ftype-unlock!} is used to unlock a lock.
If it finds the lock locked, it unlocks the lock and returns.
Otherwise, it returns without changing the lock.
\section{Locked increment and decrement\label{SECTTHREADLOCKEDINCRDECR}}
The locked operations described here can be used when just an atomic
increment or decrement is required.
\entryheader
\formdef{ftype-locked-incr!}{\categorysyntax}{(ftype-locked-incr! \var{ftype-name} (\var{a} ...) \var{fptr-expr})}
\formdef{ftype-locked-incr!}{\categorysyntax}{(ftype-locked-incr! \var{ftype-name} (\var{a} ...) \var{fptr-expr} \var{index})}
\returns \scheme{#t} if the updated value is 0, \scheme{#f} otherwise
\formdef{ftype-locked-decr!}{\categorysyntax}{(ftype-locked-decr! \var{ftype-name} (\var{a} ...) \var{fptr-expr})}
\formdef{ftype-locked-decr!}{\categorysyntax}{(ftype-locked-decr! \var{ftype-name} (\var{a} ...) \var{fptr-expr} \var{index})}
\returns \scheme{#t} if the updated value is 0, \scheme{#f} otherwise
\listlibraries
\endentryheader
Each of these has a syntax like and behaves similarly to
\scheme{ftype-set!} (page~\pageref{defn:ftype-set!}), though with an implicit
\var{val-expr}.
In particular, the restrictions on and handling of \var{fptr-expr}
and the accessors \scheme{\var{a} \dots} is similar, with one important
restriction: the field specified by the last accessor, upon which
the form operates, must be a word-size integer, i.e., an
\scheme{iptr}, \scheme{uptr}, or the equivalent, with the native
endianness.
\scheme{ftype-locked-incr!} atomically reads the value of the specified
field, adds $1$ to the value, and writes the new value back into the
field.
Similarly, \scheme{ftype-locked-decr!} atomically reads the value of
the specified field, subtracts $1$ from the value, and writes the new
value back into the field.
Both return \scheme{#t} if the new value is 0, otherwise \scheme{#f}.
\section{Reference counting with ftype guardians\label{SECTTHREADFTYPEGUARDIANS}}
\index{\scheme{ftype-guardian}}%
Applications that manage memory outside the Scheme heap can leverage
the Scheme storage management system to help perform reference
counting via \emph{ftype guardians}.
In a reference-counted memory management system, each object holds
a count of pointers to it.
The count is incremented when a new pointer is created and decremented
when a pointer is dropped.
When the count reaches zero, the object is no longer needed and the
memory it formerly occupied can be made available for some other
purpose.
Ftype guardians are similar to guardians created by
\index{\scheme{make-guardian}}\scheme{make-guardian}
(Section~\ref{SECTGUARDWEAKPAIRS}).
The \index{\scheme{guardian?}}\scheme{guardian?} procedure returns
true for both, and the
\index{\scheme{unregister-guardian}}\scheme{unregister-guardian}
procedure can be used to unregister objects registered with either.
\entryheader
\formdef{ftype-guardian}{\categorysyntax}{(ftype-guardian \var{ftype-name})}
\returns a new ftype guardian
\listlibraries
\endentryheader
\var{ftype-name} must name an ftype.
The first base field of the ftype (or one of the first base fields
in the case of unions) must be a word-sized integer (iptr or uptr)
with native endianness.
This field is assumed to hold a reference count.
The return value is a new ftype guardian \var{g}, with which
ftype-pointers of type \var{ftype-name} (or some subtype of
\var{ftype-name}) can be registered.
An ftype pointer is registered with \var{g} by invoking \var{g}
with the ftype pointer as an argument.
An ftype guardian does not automatically protect from collection
the ftype pointers registered with it, as a normal guardian would
do.
Instead, for each registered ftype pointer that becomes inaccessible
via normal (non-weak, non-guardian pointers), the guardian decrements
the reference count of the object to which the ftype pointer points.
If the resulting reference-count value is zero, the ftype pointer
is preserved and can be retrieved from the guardian.
If the resulting reference-count value is non-zero, however, the
ftype pointer is not preserved.
Objects retrieved from an ftype guardian (by calling it without
arguments) are guaranteed to have zero reference counts, assuming
reference counts are maintained properly by code outside the
collector.
The collector decrements the reference count using the equivalent
of \index{\scheme{ftype-locked-decr!}}\scheme{ftype-locked-decr!}
to support systems in which non-Scheme objects are stored in memory
shared by multiple processes.
In such systems, programs should themselves use
\index{\scheme{ftype-locked-incr!}}\scheme{ftype-locked-incr!} and
\scheme{ftype-locked-decr!} or non-Scheme equivalents (e.g., the C
\index{\scheme{LOCKED_INCR}}\scheme{LOCKED_INCR} and
\index{\scheme{LOCKED_INCR}}\scheme{LOCKED_DECR} macros in scheme.h,
which are described in Section~\ref{SECTFOREIGNCLIB}) to maintain
reference counts.
The following example defines a simple ftype and an allocator for
objects of that ftype that frees any objects of that ftype that were
previously allocated and no longer accessible.
\schemedisplay
(module (A make-A free-dropped-As)
(define-ftype A
(struct
[refcount uptr]
[data int]))
(define g (ftype-guardian A))
(define free-dropped-As
(lambda ()
(let ([a (g)])
(when a
(printf "freeing ~s\n" (ftype-ref A (data) a))
(foreign-free (ftype-pointer-address a))
(free-dropped-As)))))
(define make-A
(lambda (n)
(free-dropped-As)
(let ([a (make-ftype-pointer A (foreign-alloc (ftype-sizeof A)))])
(ftype-set! A (refcount) a 1)
(ftype-set! A (data) a n)
(g a)
a))))
\endschemedisplay
We can test this by allocating, dropping, and immediately collecting
ftype pointers to A.
\schemedisplay
> (do ([i 10 (fx- i 1)])
((fx= i 0))
(make-A i)
(collect))
freeing 10
freeing 9
freeing 8
freeing 7
freeing 6
freeing 5
freeing 4
freeing 3
freeing 2
> (free-dropped-As)
freeing 1
\endschemedisplay
Objects guarded by an ftype guardian might contain pointers to other
objects whose reference counts should also be incremented upon
allocation of the containing object and decremented upon freeing
of the containing object.
\section{Thread Parameters\label{SECTTHREADPARAMETERS}}
%----------------------------------------------------------------------------
\noskipentryheader
\formdef{make-thread-parameter}{\categoryprocedure}{(make-thread-parameter \var{object})}
\formdef{make-thread-parameter}{\categoryprocedure}{(make-thread-parameter \var{object} \var{procedure})}
\returns a new thread parameter
\listlibraries
\endnoskipentryheader
\noindent
See Section~\ref{SECTPARAMETERS} for a general
discussion of parameters and the use of the optional second argument.
When a thread parameter is created, a separate location is set aside
in each current and future thread to hold the value of the parameter's
internal state variable.
(This location may be eliminated by the storage manager when the
parameter becomes inaccessible.)
Changes to the thread parameter in one thread are not seen by any
other thread.
When a new thread is created (see \scheme{fork-thread}),
the current value (not location) of each
thread parameter is inherited from the forking thread by the new thread.
Similarly, when a thread created by some other means is activated for the
first time (see \scheme{Sactivate_thread} in
Section~\ref{SECTFOREIGNCLIB}), the current value (not location) of each
thread parameter is inherited from the main (original) thread by the new
thread.
Most built-in parameters are thread parameters, but some are global.
All are marked as global or thread where they are defined.
There is no distinction between built-in global and thread parameters
in the nonthreaded versions of the system.
\section{Buffered I/O\label{SECTTHREADSBUFFEREDIO}}
Chez Scheme buffers file I/O operations for efficiency, but buffered
I/O is not thread safe.
Two threads that write to or read from the same buffered port concurrently
can corrupt the port, resulting in buffer overruns and, ultimately,
invalid memory references.
Buffering on binary output ports can be disabled when opened with
buffer-mode \scheme{none}.
Buffering on input ports cannot be completely disabled, however, due to
the need to support lookahead, and buffering on textual ports, even
textual output ports, cannot be disabled completely because the
transcoders that convert between characters and bytes sometimes
require some lookahead.
Two threads should thus \emph{never} read from or write to the same port
concurrently, except in the special case of a binary output port
opened buffer-mode \scheme{none}.
Alternatives include appointing one thread to perform all I/O for a
given port and providing a per-thread generic-port wrapper that
forwards requests to the port only after acquiring a mutex.
The initial console and current input and output ports are thread-safe,
as are transcript ports, so it is safe for multiple threads to print error
and/or debugging messages to the console.
The output may be interleaved, even within the same line, but the port
will not become corrupted.
Thread safety for these ports is accomplished at the high cost of
acquiring a mutex for each I/O operation.
\section{Example: Bounded Queues}
The following code, taken from the article
``A Scheme for native threads~\cite{Dybvig:mitchfest-threads},''
implements a bounded queue using many of the
thread-system features.
A bounded queue has a fixed number of available slots.
Attempting to enqueue when the queue is full causes the calling thread
to block.
Attempting to dequeue from an empty queue causes the calling thread
to block.
%%% from thread article
\schemedisplay
(define-record-type bq
(fields
(immutable data)
(mutable head)
(mutable tail)
(immutable mutex)
(immutable ready)
(immutable room))
(protocol
(lambda (new)
(lambda (bound)
(new (make-vector bound) 0 0 (make-mutex)
(make-condition) (make-condition))))))
(define dequeue!
(lambda (q)
(with-mutex (bq-mutex q)
(let loop ()
(let ([head (bq-head q)])
(cond
[(= head (bq-tail q))
(condition-wait (bq-ready q) (bq-mutex q))
(loop)]
[else
(bq-head-set! q (incr q head))
(condition-signal (bq-room q))
(vector-ref (bq-data q) head)]))))))
(define enqueue!
(lambda (item q)
(with-mutex (bq-mutex q)
(let loop ()
(let* ([tail (bq-tail q)] [tail^ (incr q tail)])
(cond
[(= tail^ (bq-head q))
(condition-wait (bq-room q) (bq-mutex q))
(loop)]
[else
(vector-set! (bq-data q) tail item)
(bq-tail-set! q tail^)
(condition-signal (bq-ready q))]))))))
(define incr
(lambda (q i)
(modulo (+ i 1) (vector-length (bq-data q)))))
\endschemedisplay
\noindent
The code below demonstrates the use of the bounded queue abstraction
with a set of threads that act as consumers and producers of the
data in the queue.
\schemedisplay
(define job-queue)
(define die? #f)
(define make-job
(let ([count 0])
(define fib
(lambda (n)
(if (< n 2)
n
(+ (fib (- n 2)) (fib (- n 1))))))
(lambda (n)
(set! count (+ count 1))
(printf "Adding job #~s = (lambda () (fib ~s))\n" count n)
(cons count (lambda () (fib n))))))
(define make-producer
(lambda (n)
(rec producer
(lambda ()
(printf "producer ~s posting a job\n" n)
(enqueue! (make-job (+ 20 (random 10))) job-queue)
(if die?
(printf "producer ~s dying\n" n)
(producer))))))
(define make-consumer
(lambda (n)
(rec consumer
(lambda ()
(printf "consumer ~s looking for a job~%" n)
(let ([job (dequeue! job-queue)])
(if die?
(printf "consumer ~s dying\n" n)
(begin
(printf "consumer ~s executing job #~s~%" n (car job))
(printf "consumer ~s computed: ~s~%" n ((cdr job)))
(consumer))))))))
(define (bq-test np nc)
(set! job-queue (make-bq (max nc np)))
(do ([np np (- np 1)])
((<= np 0))
(fork-thread (make-producer np)))
(do ([nc nc (- nc 1)])
((<= nc 0))
(fork-thread (make-consumer nc))))
\endschemedisplay
\noindent
Here are a possible first several lines of output from a sample run of the example program.
\schemedisplay
> (begin
(bq-test 3 4)
(system "sleep 3")
(set! die? #t))
producer 3 posting a job
Adding job #1 = (lambda () (fib 29))
producer 3 posting a job
Adding job #2 = (lambda () (fib 26))
producer 3 posting a job
Adding job #3 = (lambda () (fib 22))
producer 3 posting a job
Adding job #4 = (lambda () (fib 21))
producer 2 posting a job
Adding job #5 = (lambda () (fib 29))
producer 1 posting a job
Adding job #6 = (lambda () (fib 29))
consumer 4 looking for a job
producer 3 posting a job
Adding job #7 = (lambda () (fib 24))
consumer 4 executing job #1
consumer 3 looking for a job
producer 2 posting a job
Adding job #8 = (lambda () (fib 26))
consumer 3 executing job #2
consumer 3 computed: 121393
consumer 3 looking for a job
producer 1 posting a job
Adding job #9 = (lambda () (fib 26))
...
\endschemedisplay
Additional examples, including definitions of suspendable threads and
threads that automatically terminate when they become inaccessible, are
given in ``A Scheme for native threads~\cite{Dybvig:mitchfest-threads}.''
% \section{Thread System OOP Interface}
%
% The thread system OOP interface consists of one new form,
% \scheme{define-threaded-class}.
% This form provides a high-level interface for acquiring mutexes
% and waiting for conditions.
% A \scheme{define-threaded-class} form has the following general
% syntax:
%
% \schemedisplay
% (define-threaded-class (\var{name} \var{fmls}) (\var{parent} \var{expr} \dots)
% (state [\var{ivar} \var{init}] \dots)
% (init \var{expr} \dots)
% (conditions
% [\var{cname} \var{pred}]
% \dots)
% (methods
% [locked \var{mname} \var{mfmls} \var{body}]
% \dots))
% \endschemedisplay
%
% \noindent
% Each of the labeled sections (\scheme{state}, \scheme{init},
% \scheme{conditions}, and \scheme{methods}) is optional, as is
% the \scheme{locked} keyword.
% The \scheme{locked} keyword may be applied to all, none, or
% some of the methods in a threaded-class definition.
%
% The \scheme{conditions} subform and the \scheme{locked} keyword are
% extensions to the \scheme{define-class} syntax.
% If no \scheme{conditions} subform is given and no \scheme{locked} keywords are
% present, \scheme{define-threaded-class} is identical to
% \scheme{define-class}, both in syntax and semantics.
%
% If any methods are annotated with the \scheme{locked} keyword, a
% mutex is associated with each instance of the class, and those
% methods automatically acquire and release the mutex as if the
% body were wrapped in a \scheme{with-mutex} form.
% The following definition of a \scheme{stack} class demonstrates
% the \scheme{locked} keyword.
%
% \schemedisplay
% (define-threaded-class (<stack>) (<base>)
% (state [pdl '()])
% (methods
% [locked push (v)
% (set! pdl (cons v pdl))]
% [locked pop (default)
% (if (null? pdl)
% default
% (let ([v (car pdl)])
% (set! pdl (cdr pdl))
% v))]))
% \endschemedisplay
%
% \noindent
% The \scheme{push} method adds an item to the top of the stack.
% The \scheme{pop} method removes an item from the top of the
% stack and returns it, unless the stack is empty, in which case
% it returns the default value passed in by the caller.
%
% This may seem like an unnecessarily complex version of \scheme{pop}.
% A simpler and more familiar approach would be to provide an
% \scheme{empty?} method for determining if the contains any items
% and to remove this test form \scheme{pop} as follows.
%
% \schemedisplay
% (define-threaded-class (<simpler-stack>) (<base>)
% (state [pdl '()])
% (methods
% [empty (v) (null? pdl)]
% [locked push (v)
% (set! pdl (cons v pdl))]
% [locked pop ()
% (let ([v (car pdl)])
% (set! pdl (cdr pdl))
% v)]))
% \endschemedisplay
%
% \noindent
% Because it does not update the stack, \scheme{empty?} need not be
% locked.
% Unfortunately, \scheme{empty?} is not useful in a threaded environment,
% because another thread may pop the stack in between the time
% \scheme{empty?} and \scheme{pop} are called.
% In general, the entire operation to be performed, including
% any questions to be asked and any mutations to
% be performed, must be encapsulated within a single locked
% method.
%
% It is possible to have a useful method that need not be locked.
% The following version of \scheme{<stack>} maintains a count of
% objects pushed onto the stack over time, and the
% \scheme{count} method is used to retrieve this count.
%
% \schemedisplay
% (define-threaded-class (<stack>) (<base>)
% (state [pdl '()] [count 0])
% (methods
% [count () count]
% [locked push (v)
% (set! count (+ count 1))
% (set! pdl (cons v pdl))]
% [locked pop (default)
% (if (null? pdl)
% default
% (let ([v (car pdl)])
% (set! pdl (cdr pdl))
% v))]))
% \endschemedisplay
%
% \noindent
% Although \scheme{count} may be out of date as soon as the caller
% receives it, the value returned is guaranteed to be a valid
% count at some time after the method call is made and before the
% method call returns.
%
% A condition variable
% is associated with each instance of the class.
% for each \scheme{(\var{cname} \var{pred})} pair in the
% \scheme{conditions} subform.
% The identifiers \scheme{\var{cname} \dots} name the associated
% conditions within the methods of the class.
% The predicate \var{pred} associated with a condition is an
% expression that should evaluate to a true value if and only
% if the condition is considered satisfied.
% The predicate expression may refer to the instance variables
% of the class.
%
% A method waits for a condition to be satisfied using the
% \scheme{require} form, which is valid only within the locked
% methods of the class.
%
% \schemedisplay
% (require \var{cname} \var{body})
% \endschemedisplay
%
% \noindent
% When a thread evaluates a \scheme{require} form, it evaluates
% the predicate associated with \var{cname}.
% If the predicate returns a true value, the thread proceeds to
% evaluate \var{body}.
% If the predicate returns false, it waits for the associated
% condition variable, as if with an explicit call to condition wait,
% Upon being released from the condition variable (by a signal
% or broadcast from another thread), the thread loops back to
% check the predicate again.
% Control thus does not reach the body of the \scheme{require}
% form until the predicate evaluates to a true value.
%
% Waiting threads may be released from a condition by applying
% either \scheme{condition-signal} or \scheme{condition-broadcast}
% to the value of the corresponding \var{cname}.
% %%% [Question: must the signaling thread be locked?]
%
% Here is a \scheme{<bq>} test that redefines the bounded queue.
% Compare this with the earlier definitions of the \scheme{<bq>},
% \scheme{enqueue!}, and \scheme{dequeue!}.
%
% \schemedisplay
% (define-threaded-class (<bq> i) (<base>)
% (state [i i] [vec (make-vector i)])
% (conditions
% [ready (not (= i (vector-length vec)))]
% [room (not (= i 0))])
% (methods
% [locked enqueue! (item)
% (require room
% (set! i (- i 1))
% (vector-set! vec i item)
% (condition-signal ready))]
% [locked dequeue! ()
% (require ready
% (let ([item (vector-ref vec i)])
% (set! i (+ i 1))
% (condition-signal room)
% item))]))
% \endschemedisplay
%
%