1522 lines
54 KiB
Scheme
1522 lines
54 KiB
Scheme
|
;;; thread.ms
|
||
|
;;; Copyright 1984-2017 Cisco Systems, Inc.
|
||
|
;;;
|
||
|
;;; Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
;;; you may not use this file except in compliance with the License.
|
||
|
;;; You may obtain a copy of the License at
|
||
|
;;;
|
||
|
;;; http://www.apache.org/licenses/LICENSE-2.0
|
||
|
;;;
|
||
|
;;; Unless required by applicable law or agreed to in writing, software
|
||
|
;;; distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
;;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
;;; See the License for the specific language governing permissions and
|
||
|
;;; limitations under the License.
|
||
|
|
||
|
; why can't this come after misc.ms?
|
||
|
|
||
|
(define-syntax when-threaded
|
||
|
(lambda (x)
|
||
|
(syntax-case x ()
|
||
|
[(_ e ...)
|
||
|
(if (threaded?)
|
||
|
#'(begin (void) e ...)
|
||
|
#'(void))])))
|
||
|
|
||
|
(mat engine-thread
|
||
|
(let ()
|
||
|
(define fattercode
|
||
|
'(module (fatterfib fatter slimmer zero)
|
||
|
(define zero? null?)
|
||
|
(define 1+ list)
|
||
|
(define 1- car)
|
||
|
(define zero '())
|
||
|
(define (fatter n)
|
||
|
(if (= n 0)
|
||
|
zero
|
||
|
(1+ (fatter (- n 1)))))
|
||
|
(define (slimmer x)
|
||
|
(if (zero? x)
|
||
|
0
|
||
|
(+ (slimmer (1- x)) 1)))
|
||
|
(define fatter+
|
||
|
(lambda (x y)
|
||
|
(if (zero? y)
|
||
|
x
|
||
|
(fatter+ (1+ x) (1- y)))))
|
||
|
(define fatterfib
|
||
|
(lambda (x)
|
||
|
(if (or (zero? x) (zero? (1- x)))
|
||
|
(1+ zero)
|
||
|
(fatter+ (fatterfib (1- x)) (fatterfib (1- (1- x)))))))))
|
||
|
(define fatter-eval
|
||
|
(lambda (eval loc)
|
||
|
(lambda ()
|
||
|
(let f ([n 10] [a 0])
|
||
|
(if (= n 0)
|
||
|
(set-car! loc a)
|
||
|
(f (- n 1)
|
||
|
(+ a (eval `(let ()
|
||
|
,fattercode
|
||
|
(slimmer (fatterfib (fatter 10))))))))))))
|
||
|
|
||
|
(define (engine-eval-test n)
|
||
|
(define thread-list '())
|
||
|
(define fork-thread
|
||
|
(lambda (x)
|
||
|
(set! thread-list (cons (make-engine x) thread-list))))
|
||
|
(define locs (map list (make-list n 0)))
|
||
|
(for-each
|
||
|
(lambda (x) (fork-thread (fatter-eval interpret x)))
|
||
|
locs)
|
||
|
(let f ([q thread-list])
|
||
|
(unless (null? q)
|
||
|
((car q)
|
||
|
10
|
||
|
(lambda (t . r) (f (cdr q)))
|
||
|
(lambda (x) (f (append (cdr q) (list x)))))))
|
||
|
(map car locs))
|
||
|
|
||
|
(equal? (engine-eval-test 7) '(890 890 890 890 890 890 890)))
|
||
|
)
|
||
|
|
||
|
(when-threaded
|
||
|
(mat thread
|
||
|
(let ([m (make-mutex)] [c (make-condition)]
|
||
|
[m2 (make-mutex 'mname)] [c2 (make-condition 'cname)])
|
||
|
(and (mutex? m)
|
||
|
(thread-condition? c)
|
||
|
(mutex? m2)
|
||
|
(thread-condition? c2)
|
||
|
(not (mutex? c))
|
||
|
(not (thread-condition? m))
|
||
|
(not (mutex? c2))
|
||
|
(not (thread-condition? m2))
|
||
|
(not (mutex-name m))
|
||
|
(not (condition-name c))
|
||
|
(eq? 'mname (mutex-name m2))
|
||
|
(eq? 'cname (condition-name c2))
|
||
|
(not (mutex? 'mutex))
|
||
|
(not (thread-condition? 'condition))))
|
||
|
(begin
|
||
|
(define $fib (lambda (x) (if (< x 2) 1 (+ ($fib (- x 1)) ($fib (- x 2))))))
|
||
|
(let-syntax ([mats-dir-relative
|
||
|
(lambda (x)
|
||
|
(syntax-case x ()
|
||
|
[(_ (include ?path))
|
||
|
(string? (datum ?path))
|
||
|
#`(include #,(format "~a/~a" *mats-dir* (datum ?path)))]))])
|
||
|
(mats-dir-relative
|
||
|
(include "../../mats/thread-check.ss")))
|
||
|
(define $time-in-range?
|
||
|
(lambda (start stop target)
|
||
|
(let ([t (time-difference stop start)])
|
||
|
(<= (abs (- (+ (time-second t) (* (time-nanosecond t) 1e-9)) target))
|
||
|
0.2))))
|
||
|
(andmap procedure? (list $threads $fib $thread-check $time-in-range?)))
|
||
|
($thread-check)
|
||
|
(not (= (let ([n #f])
|
||
|
(fork-thread (lambda () (set! n (get-thread-id))))
|
||
|
(let f () (if n n (begin ($yield) (f)))))
|
||
|
0))
|
||
|
($thread-check)
|
||
|
(= (let ([n #f])
|
||
|
(fork-thread (lambda () (set! n (get-process-id))))
|
||
|
(let f () (if n n (begin ($yield) (f)))))
|
||
|
(get-process-id))
|
||
|
($thread-check)
|
||
|
(equal?
|
||
|
(let ()
|
||
|
(define fat+
|
||
|
(lambda (x y)
|
||
|
(if (zero? y)
|
||
|
x
|
||
|
(fat+ (1+ x) (1- y)))))
|
||
|
(define fatfib
|
||
|
(lambda (x)
|
||
|
(if (< x 2)
|
||
|
1
|
||
|
(fat+ (fatfib (1- x)) (fatfib (1- (1- x)))))))
|
||
|
(define iota
|
||
|
(case-lambda
|
||
|
[(n) (iota 0 n)]
|
||
|
[(i n)
|
||
|
(if (= n 0)
|
||
|
'()
|
||
|
(cons i (iota (+ i 1) (- n 1))))]))
|
||
|
(define-syntax parallel-list
|
||
|
(syntax-rules ()
|
||
|
[(_ x ...)
|
||
|
(let ([v (make-vector (length '(x ...)) #f)]
|
||
|
[m (make-mutex)]
|
||
|
[c (make-condition)]
|
||
|
[n (length '(x ...))])
|
||
|
(map (lambda (p i) (p i))
|
||
|
(list (lambda (i)
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(vector-set! v i x)
|
||
|
(with-mutex m
|
||
|
(set! n (- n 1))
|
||
|
(when (= n 0)
|
||
|
(condition-signal c))))))
|
||
|
...)
|
||
|
(iota (length '(x ...))))
|
||
|
(and (with-mutex m
|
||
|
(condition-wait c m (make-time 'time-duration 0 60)))
|
||
|
(vector->list v)))]))
|
||
|
(parallel-list (fatfib 26) (fatfib 27) (fatfib 28)
|
||
|
(fatfib 29) (fatfib 30) (fatfib 31)))
|
||
|
'(196418 317811 514229 832040 1346269 2178309))
|
||
|
($thread-check)
|
||
|
(let ([m (make-mutex)] [c (make-condition)])
|
||
|
(with-mutex m
|
||
|
(let* ([start (current-time)]
|
||
|
[r (condition-wait c m (make-time 'time-duration 250000000 1))]
|
||
|
[stop (current-time)])
|
||
|
(and (not r)
|
||
|
($time-in-range? start stop 1.25)))))
|
||
|
(let ([m (make-mutex)] [c (make-condition)])
|
||
|
(with-mutex m
|
||
|
(let* ([start (current-time)]
|
||
|
[r (condition-wait c m
|
||
|
(add-duration start (make-time 'time-duration 250000000 1)))]
|
||
|
[stop (current-time)])
|
||
|
(and (not r)
|
||
|
($time-in-range? start stop 1.25)))))
|
||
|
(let ([m (make-mutex)] [c (make-condition)])
|
||
|
(with-mutex m
|
||
|
(let* ([start (current-time)]
|
||
|
[r (condition-wait c m (make-time 'time-duration 0 -1))]
|
||
|
[stop (current-time)])
|
||
|
(and (not r)
|
||
|
($time-in-range? start stop 0.0)))))
|
||
|
(let ([m (make-mutex)] [c (make-condition)])
|
||
|
(with-mutex m
|
||
|
(let* ([start (current-time)]
|
||
|
[r (condition-wait c m
|
||
|
(add-duration start (make-time 'time-duration 0 -1)))]
|
||
|
[stop (current-time)])
|
||
|
(and (not r)
|
||
|
($time-in-range? start stop 0.0)))))
|
||
|
(let ([m (make-mutex)] [c (make-condition)])
|
||
|
(with-mutex m
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(with-mutex m (sleep (make-time 'time-duration 250000000 0)))))
|
||
|
(let* ([start (current-time)]
|
||
|
[r (condition-wait c m (make-time 'time-duration 100000000 0))]
|
||
|
[stop (current-time)])
|
||
|
(and (not r)
|
||
|
($time-in-range? start stop 0.25)))))
|
||
|
(let ([count 300] [live 0] [live-m (make-mutex)])
|
||
|
(parameterize ([collect-request-handler
|
||
|
(lambda ()
|
||
|
(set! count (- count 1))
|
||
|
(collect))])
|
||
|
(define chew (lambda (x) (when (> count 0) (chew (list 0)))))
|
||
|
(define (gc-test n)
|
||
|
(set! live n)
|
||
|
(do ([i 0 (+ i 1)])
|
||
|
((= i n))
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(chew 0)
|
||
|
(with-mutex live-m (set! live (- live 1)))))))
|
||
|
(gc-test 4)
|
||
|
(chew 0))
|
||
|
; wait for the others to die
|
||
|
(let f () (unless (= live 0) ($yield) (f)))
|
||
|
#t)
|
||
|
($thread-check)
|
||
|
(let ([count 300] [live 0] [live-m (make-mutex)])
|
||
|
(parameterize ([collect-request-handler
|
||
|
(lambda ()
|
||
|
(set! count (- count 1))
|
||
|
(collect))])
|
||
|
(define chew (lambda (x) (when (> count 0) (chew (list 0)))))
|
||
|
(define (gc-test n)
|
||
|
(set! live n)
|
||
|
(do ([i 0 (+ i 1)])
|
||
|
((= i n))
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(chew 0)
|
||
|
(with-mutex live-m (set! live (- live 1)))))))
|
||
|
(let ([m1 (make-mutex)]
|
||
|
[m2 (make-mutex)]
|
||
|
[c1 (make-condition)]
|
||
|
[c2 (make-condition)])
|
||
|
; suspend one thread on a condition, waiting for it to get there
|
||
|
(with-mutex m1
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(with-mutex m1
|
||
|
(condition-signal c1)
|
||
|
(condition-wait c1 m1)
|
||
|
(condition-signal c1))))
|
||
|
(condition-wait c1 m1))
|
||
|
; try to get another suspended on a mutex
|
||
|
(mutex-acquire m2)
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(with-mutex m2 (condition-signal c2))))
|
||
|
; start some threads a-allocating
|
||
|
(gc-test 3)
|
||
|
; join in the fun
|
||
|
(chew 0)
|
||
|
; release the suspended threads
|
||
|
(with-mutex m1
|
||
|
(condition-signal c1)
|
||
|
(condition-wait c1 m1))
|
||
|
(condition-wait c2 m2)
|
||
|
(mutex-release m2)))
|
||
|
; wait for the others to die
|
||
|
(let f () (unless (= live 0) ($yield) (f)))
|
||
|
#t)
|
||
|
($thread-check)
|
||
|
(let ([count 300] [live 0] [live-m (make-mutex)])
|
||
|
(define fat+
|
||
|
(lambda (x y)
|
||
|
(if (zero? y)
|
||
|
x
|
||
|
(fat+ (1+ x) (1- y)))))
|
||
|
(define fatfib
|
||
|
(lambda (x)
|
||
|
(if (< x 2)
|
||
|
1
|
||
|
(fat+ (fatfib (1- x)) (fatfib (1- (1- x)))))))
|
||
|
(parameterize ([collect-request-handler
|
||
|
(lambda ()
|
||
|
(set! count (- count 1))
|
||
|
(collect))])
|
||
|
(define chew (lambda (x) (when (> count 0) (chew (list 0)))))
|
||
|
(define (gc-test n)
|
||
|
(set! live n)
|
||
|
(do ([i 0 (+ i 1)])
|
||
|
((= i n))
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(chew 0)
|
||
|
(with-mutex live-m (set! live (- live 1)))))))
|
||
|
(let ([m1 (make-mutex)]
|
||
|
[m2 (make-mutex)]
|
||
|
[c1 (make-condition)]
|
||
|
[c2 (make-condition)])
|
||
|
; suspend one thread on a condition, waiting for it to get there
|
||
|
(with-mutex m1
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(with-mutex m1
|
||
|
(condition-signal c1)
|
||
|
(condition-wait c1 m1)
|
||
|
(condition-signal c1))))
|
||
|
(condition-wait c1 m1))
|
||
|
; try to get another suspended on a mutex
|
||
|
(mutex-acquire m2)
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(with-mutex m2 (condition-signal c2))))
|
||
|
; start some threads a-allocating
|
||
|
(gc-test 5)
|
||
|
; create threads that die quickly while waiting for
|
||
|
; our count collections to occur
|
||
|
(let ([live 0] [live-m (make-mutex)])
|
||
|
(let f ()
|
||
|
(when (> count 0)
|
||
|
(fatfib 20)
|
||
|
(with-mutex live-m
|
||
|
; cap the number of quickly dying threads, or ti3nb
|
||
|
; runs out of memory somewhere later in the mat run.
|
||
|
; theory is that virtual address space gets chopped
|
||
|
; up and that the main thread can't access virtual
|
||
|
; memory previously assigned to a child thread.
|
||
|
(if (< live 20)
|
||
|
; don't panic if we try to create too many...
|
||
|
(guard (c [#t (values)])
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(with-mutex live-m (set! live (- live 1)))))
|
||
|
(set! live (+ live 1)))))
|
||
|
(f))))
|
||
|
; release the suspended threads
|
||
|
(with-mutex m1
|
||
|
(condition-signal c1)
|
||
|
(condition-wait c1 m1))
|
||
|
(condition-wait c2 m2)
|
||
|
(mutex-release m2)))
|
||
|
; wait for the others to die
|
||
|
(let f () (unless (= live 0) ($yield) (f)))
|
||
|
#t)
|
||
|
($thread-check)
|
||
|
(let ([count 30])
|
||
|
(parameterize ([collect-request-handler
|
||
|
(lambda ()
|
||
|
(set! count (- count 1))
|
||
|
(collect))])
|
||
|
(define chew (lambda (x) (when (> count 0) (chew (list 0)))))
|
||
|
(define (gc-test4 n)
|
||
|
(define fib
|
||
|
(lambda (x)
|
||
|
(if (< x 2)
|
||
|
1
|
||
|
(+ (fib (- x 1)) (fib (- x 2))))))
|
||
|
; stall collection for a bit
|
||
|
(disable-interrupts)
|
||
|
(do ([i 0 (+ i 1)])
|
||
|
((= i n))
|
||
|
(fork-thread (lambda () (enable-interrupts) (chew 0))))
|
||
|
(fib 35)
|
||
|
(enable-interrupts))
|
||
|
(gc-test4 4)
|
||
|
(chew 0)
|
||
|
#t))
|
||
|
($thread-check)
|
||
|
(eqv?
|
||
|
(let ()
|
||
|
(define m (make-mutex))
|
||
|
(define fib
|
||
|
(lambda (n)
|
||
|
(if (with-mutex m (< n 2))
|
||
|
1
|
||
|
(+ (fib (- n 1)) (fib (- n 2))))))
|
||
|
(fib 20))
|
||
|
10946)
|
||
|
($thread-check)
|
||
|
(let ()
|
||
|
(define m (make-mutex))
|
||
|
(define mgrab
|
||
|
(lambda (n)
|
||
|
(unless (fxzero? n)
|
||
|
(mutex-acquire m)
|
||
|
(mutex-release m)
|
||
|
(mgrab (fx- n 1)))))
|
||
|
(mgrab 100)
|
||
|
#t)
|
||
|
($thread-check)
|
||
|
(let ()
|
||
|
(define m (make-mutex))
|
||
|
(define recmgrab
|
||
|
(lambda (n)
|
||
|
(set! m (make-mutex))
|
||
|
(let f ([n n])
|
||
|
(unless (fxzero? n)
|
||
|
(mutex-acquire m)
|
||
|
(f (fx- n 1))
|
||
|
(mutex-release m)))
|
||
|
(fork-thread (lambda () (mutex-acquire m) #;(pretty-print 'bye)))))
|
||
|
(recmgrab 100)
|
||
|
#t)
|
||
|
($thread-check)
|
||
|
(eqv?
|
||
|
(let ([cnt 0] [ans 0])
|
||
|
(define m (make-mutex))
|
||
|
(mutex-acquire m)
|
||
|
(fork-thread
|
||
|
(rec f
|
||
|
(lambda ()
|
||
|
(with-mutex m (set! cnt (+ cnt 1)))
|
||
|
(if (= ans 0)
|
||
|
(begin ($yield) (f))
|
||
|
(set! ans (- ans))))))
|
||
|
(mutex-release m)
|
||
|
(let f () (when (= cnt 0) (begin ($yield) (f))))
|
||
|
(set! ans 17)
|
||
|
(let f () (if (< ans 0) ans (begin ($yield) (f)))))
|
||
|
-17)
|
||
|
($thread-check)
|
||
|
(eqv?
|
||
|
(let ()
|
||
|
(define ans 0)
|
||
|
(define cnt 0)
|
||
|
(define m (make-mutex))
|
||
|
(define c (make-condition))
|
||
|
(define (f n)
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(with-mutex m
|
||
|
(set! ans (+ ans n))
|
||
|
(set! cnt (+ cnt 1))
|
||
|
(condition-wait c m)
|
||
|
(set! cnt (- cnt 1))))))
|
||
|
(f 1)
|
||
|
(f 2)
|
||
|
(f 4)
|
||
|
(let f () (unless (= cnt 3) (begin ($yield) (f))))
|
||
|
(with-mutex m (condition-broadcast c))
|
||
|
(let f () (unless (= cnt 0) (begin ($yield) (f))))
|
||
|
ans)
|
||
|
7)
|
||
|
($thread-check)
|
||
|
(let ([nthreads (length ($threads))])
|
||
|
(define-record bounded-queue (i)
|
||
|
([vec (make-vector i)]
|
||
|
[mutex (make-mutex)]
|
||
|
[ready (make-condition)]
|
||
|
[room (make-condition)]
|
||
|
[waiting 0]
|
||
|
[die? #f]))
|
||
|
|
||
|
(define queue-empty?
|
||
|
(lambda (bq)
|
||
|
(with-mutex (bounded-queue-mutex bq)
|
||
|
(= (bounded-queue-i bq)
|
||
|
(vector-length (bounded-queue-vec bq))))))
|
||
|
|
||
|
(define enqueue!
|
||
|
(lambda (item bq)
|
||
|
(let ([mutex (bounded-queue-mutex bq)])
|
||
|
(with-mutex mutex
|
||
|
(let loop ()
|
||
|
(unless (bounded-queue-die? bq)
|
||
|
(if (zero? (bounded-queue-i bq))
|
||
|
(begin
|
||
|
(condition-wait (bounded-queue-room bq) mutex)
|
||
|
; we grab the mutex when we wake up, but some other thread may
|
||
|
; beat us to the punch
|
||
|
(loop))
|
||
|
(let ([i (- (bounded-queue-i bq) 1)])
|
||
|
(vector-set! (bounded-queue-vec bq) i item)
|
||
|
(set-bounded-queue-i! bq i)
|
||
|
(unless (zero? (bounded-queue-waiting bq))
|
||
|
(condition-signal (bounded-queue-ready bq)))))))))))
|
||
|
|
||
|
(define dequeue!
|
||
|
(lambda (bq)
|
||
|
(let ([mutex (bounded-queue-mutex bq)])
|
||
|
(with-mutex mutex
|
||
|
(let loop ()
|
||
|
(unless (bounded-queue-die? bq)
|
||
|
(if (= (bounded-queue-i bq) (vector-length (bounded-queue-vec bq)))
|
||
|
(begin
|
||
|
(set-bounded-queue-waiting! bq
|
||
|
(+ (bounded-queue-waiting bq) 1))
|
||
|
(condition-wait (bounded-queue-ready bq) mutex)
|
||
|
(set-bounded-queue-waiting! bq
|
||
|
(- (bounded-queue-waiting bq) 1))
|
||
|
; we grab the mutex when we wake up, but some other thread may
|
||
|
; beat us to the punch
|
||
|
(loop))
|
||
|
(let ([i (bounded-queue-i bq)])
|
||
|
(let ([item (vector-ref (bounded-queue-vec bq) i)])
|
||
|
(set-bounded-queue-i! bq (+ i 1))
|
||
|
(condition-signal (bounded-queue-room bq))
|
||
|
item)))))))))
|
||
|
|
||
|
(define job-queue)
|
||
|
|
||
|
(define fib
|
||
|
(lambda (n)
|
||
|
(if (< n 2)
|
||
|
n
|
||
|
(+ (fib (- n 2)) (fib (- n 1))))))
|
||
|
|
||
|
(define make-job
|
||
|
(let ([count 0])
|
||
|
(lambda (n)
|
||
|
(set! count (+ count 1))
|
||
|
(printf "Adding job #~s = (lambda () (fib ~s))\n" count n)
|
||
|
(cons count (lambda () (fib n))))))
|
||
|
|
||
|
(define make-producer
|
||
|
(lambda (n)
|
||
|
(rec producer
|
||
|
(lambda ()
|
||
|
(printf "producer ~s posting a job\n" n)
|
||
|
(enqueue! (make-job (+ 20 (random 10))) job-queue)
|
||
|
(if (bounded-queue-die? job-queue)
|
||
|
(printf "producer ~s dying\n" n)
|
||
|
(producer))))))
|
||
|
|
||
|
(define make-consumer
|
||
|
(lambda (n)
|
||
|
(rec consumer
|
||
|
(lambda ()
|
||
|
(printf "consumer ~s looking for a job~%" n)
|
||
|
(let ([job (dequeue! job-queue)])
|
||
|
(if (bounded-queue-die? job-queue)
|
||
|
(printf "consumer ~s dying\n" n)
|
||
|
(begin
|
||
|
(printf "consumer ~s executing job #~s~%" n (car job))
|
||
|
(printf "consumer ~s computed: ~s~%" n ((cdr job)))
|
||
|
(consumer))))))))
|
||
|
|
||
|
(define (bq-test np nc)
|
||
|
(set! job-queue (make-bounded-queue (max nc np)))
|
||
|
(do ([np np (- np 1)])
|
||
|
((<= np 0))
|
||
|
(fork-thread (make-producer np)))
|
||
|
(do ([nc nc (- nc 1)])
|
||
|
((<= nc 0))
|
||
|
(fork-thread (make-consumer nc))))
|
||
|
|
||
|
(bq-test 3 4)
|
||
|
(set! ans (fib 35))
|
||
|
; flush out the waiting producers and consumers
|
||
|
(set-bounded-queue-die?! job-queue #t)
|
||
|
(let ()
|
||
|
(let f ()
|
||
|
(unless (= (length ($threads)) nthreads)
|
||
|
(with-mutex (bounded-queue-mutex job-queue)
|
||
|
(condition-signal (bounded-queue-room job-queue))
|
||
|
(condition-signal (bounded-queue-ready job-queue)))
|
||
|
($yield)
|
||
|
(f))))
|
||
|
(eqv? ans 9227465))
|
||
|
($thread-check)
|
||
|
(let ([ans #f])
|
||
|
(define fattercode
|
||
|
'(module (fatterfib fatter slimmer zero)
|
||
|
(define zero? null?)
|
||
|
(define 1+ list)
|
||
|
(define 1- car)
|
||
|
(define zero '())
|
||
|
(define (fatter n)
|
||
|
(if (= n 0)
|
||
|
zero
|
||
|
(1+ (fatter (- n 1)))))
|
||
|
(define (slimmer x)
|
||
|
(if (zero? x)
|
||
|
0
|
||
|
(+ (slimmer (1- x)) 1)))
|
||
|
(define fatter+
|
||
|
(lambda (x y)
|
||
|
(if (zero? y)
|
||
|
x
|
||
|
(fatter+ (1+ x) (1- y)))))
|
||
|
(define fatterfib
|
||
|
(lambda (x)
|
||
|
(if (or (zero? x) (zero? (1- x)))
|
||
|
(1+ zero)
|
||
|
(fatter+ (fatterfib (1- x)) (fatterfib (1- (1- x)))))))))
|
||
|
(define fatter-eval
|
||
|
(lambda (eval loc)
|
||
|
(lambda ()
|
||
|
(let f ([n 10] [a 0])
|
||
|
(if (= n 0)
|
||
|
(set-car! loc a)
|
||
|
(f (- n 1)
|
||
|
(+ a (eval `(let ()
|
||
|
,fattercode
|
||
|
(slimmer (fatterfib (fatter 10))))))))))))
|
||
|
|
||
|
(define (thread-eval-test n)
|
||
|
(define locs (map list (make-list n 0)))
|
||
|
(for-each
|
||
|
; compiler is not reentrant wrt threads, so must use interpret
|
||
|
(lambda (loc) (fork-thread (fatter-eval interpret loc)))
|
||
|
locs)
|
||
|
(let f ()
|
||
|
(when (ormap zero? (map car locs))
|
||
|
($yield)
|
||
|
(f)))
|
||
|
(map car locs))
|
||
|
|
||
|
(equal? (thread-eval-test 7) '(890 890 890 890 890 890 890)))
|
||
|
($thread-check)
|
||
|
(equal?
|
||
|
(let ([x1 #f] [x2 #f] [x3 #f] [x4 #f] [x5 #f])
|
||
|
(define p (make-thread-parameter 0))
|
||
|
(set! x1 (p))
|
||
|
(p 3)
|
||
|
(set! x2 (p))
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(set! x3 (p))
|
||
|
(p 5)
|
||
|
(set! x4 (p))))
|
||
|
(let f () (unless x4 ($yield) (f)))
|
||
|
(set! x5 (p))
|
||
|
(list x1 x2 x3 x4 x5))
|
||
|
'(0 3 3 5 3))
|
||
|
($thread-check)
|
||
|
(equal?
|
||
|
(let ([x1 #f] [x2 #f] [x3 #f] [x4 #f] [x5 #f])
|
||
|
(define p (make-thread-parameter 0))
|
||
|
(define m (make-mutex))
|
||
|
(define c (make-condition))
|
||
|
(set! x1 (p))
|
||
|
(p 3)
|
||
|
(set! x2 (p))
|
||
|
(with-mutex m
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(set! x3 (p))
|
||
|
(p 5)
|
||
|
(set! x4 (p))
|
||
|
(with-mutex m (condition-signal c))))
|
||
|
(condition-wait c m))
|
||
|
(set! x5 (p))
|
||
|
(list x1 x2 x3 x4 x5))
|
||
|
'(0 3 3 5 3))
|
||
|
($thread-check)
|
||
|
(let ()
|
||
|
(define done? #f)
|
||
|
(define m (make-mutex))
|
||
|
(define spin
|
||
|
(lambda ()
|
||
|
(unless done?
|
||
|
(if (mutex-acquire m #f)
|
||
|
(set! done? #t)
|
||
|
(begin ($yield) (spin))))))
|
||
|
(fork-thread spin)
|
||
|
(fork-thread spin)
|
||
|
(spin)
|
||
|
done?)
|
||
|
($thread-check)
|
||
|
(eqv?
|
||
|
(let ()
|
||
|
(define-record accumulator ()
|
||
|
([acc 0]
|
||
|
[increments 0]
|
||
|
[(immutable mutex) (make-mutex)]))
|
||
|
(define incr!
|
||
|
(lambda (a n)
|
||
|
(with-mutex (accumulator-mutex a)
|
||
|
(set-accumulator-acc! a (+ (accumulator-acc a) n))
|
||
|
(set-accumulator-increments! a (+ (accumulator-increments a) 1)))))
|
||
|
(define fact
|
||
|
(lambda (x)
|
||
|
(if (= x 0)
|
||
|
1
|
||
|
(* x (fact (- x 1))))))
|
||
|
(define foo (lambda (x) (string-length (number->string (fact x)))))
|
||
|
(let ()
|
||
|
(define acc (make-accumulator))
|
||
|
(define bar
|
||
|
(lambda (x)
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(incr! acc (foo x))))))
|
||
|
(bar 1100)
|
||
|
(bar 1200)
|
||
|
(bar 1300)
|
||
|
(bar 1400)
|
||
|
(bar 1500)
|
||
|
(bar 1400)
|
||
|
(bar 1300)
|
||
|
(bar 1200)
|
||
|
(bar 1100)
|
||
|
(let f ()
|
||
|
(unless (= (accumulator-increments acc) 9)
|
||
|
($yield)
|
||
|
(f)))
|
||
|
(accumulator-acc acc)))
|
||
|
30777)
|
||
|
($thread-check)
|
||
|
(let ()
|
||
|
(define fat+
|
||
|
(lambda (x y)
|
||
|
(if (zero? y)
|
||
|
x
|
||
|
(fat+ (1+ x) (1- y)))))
|
||
|
(define fatfib
|
||
|
(lambda (x)
|
||
|
(if (< x 2)
|
||
|
1
|
||
|
(fat+ (fatfib (1- x)) (fatfib (1- (1- x)))))))
|
||
|
(set! ans (fatfib 30))
|
||
|
(collect)
|
||
|
(eqv? ans 1346269))
|
||
|
($thread-check)
|
||
|
(eq?
|
||
|
(let ()
|
||
|
(define bt (foreign-procedure "(cs)backdoor_thread" (scheme-object) iptr))
|
||
|
(let f ([n 100])
|
||
|
(let ([q 0])
|
||
|
(unless (= n 0)
|
||
|
(let ([p (lambda ()
|
||
|
(let ([count 10])
|
||
|
(define chew (lambda (x) (when (> count 0) (chew (list 0)))))
|
||
|
(parameterize ([collect-request-handler
|
||
|
(lambda ()
|
||
|
(set! count (- count 1))
|
||
|
(collect))])
|
||
|
(chew 0)))
|
||
|
(set! q (+ q 7)))])
|
||
|
(lock-object p)
|
||
|
(bt p)
|
||
|
(let f () (when (= q 0) ($yield) (f)))
|
||
|
(let f () (unless (= (length ($threads)) 1) ($yield) (f)))
|
||
|
(unlock-object p))
|
||
|
(unless (= q 14) (errorf #f "~s isn't 14" q))
|
||
|
(f (- n 1)))))
|
||
|
'cool)
|
||
|
'cool)
|
||
|
($thread-check)
|
||
|
(let ()
|
||
|
(define m (make-mutex))
|
||
|
(define c (make-condition))
|
||
|
(define nthreads 4)
|
||
|
(define saved-condition #f)
|
||
|
(with-mutex m
|
||
|
(let f ([n nthreads])
|
||
|
(unless (= n 0)
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(guard (c [else (set! saved-condition c)])
|
||
|
(let g ([k 1000])
|
||
|
(unless (= k 0)
|
||
|
(let ([op (open-file-output-port (format "testfile~s.ss" n) (file-options replace))])
|
||
|
(port-file-compressed! op)
|
||
|
(put-u8 op 104)
|
||
|
(close-port op))
|
||
|
(g (- k 1)))))
|
||
|
(with-mutex m
|
||
|
(set! nthreads (- nthreads 1))
|
||
|
(when (= nthreads 0)
|
||
|
(condition-signal c)))))
|
||
|
(f (- n 1))))
|
||
|
(condition-wait c m))
|
||
|
(when saved-condition (raise saved-condition))
|
||
|
#t)
|
||
|
($thread-check)
|
||
|
(let ()
|
||
|
(define m (make-mutex))
|
||
|
(define c (make-condition))
|
||
|
(define nthreads 4)
|
||
|
(define saved-condition #f)
|
||
|
(with-mutex m
|
||
|
(let f ([n nthreads])
|
||
|
(unless (= n 0)
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(guard (c [else (set! saved-condition c)])
|
||
|
(let g ([k 1000])
|
||
|
(unless (= k 0)
|
||
|
(let ([ip (open-file-input-port (format "testfile~s.ss" n))])
|
||
|
(port-file-compressed! ip)
|
||
|
(let ([b (get-u8 ip)])
|
||
|
(unless (eqv? b 104)
|
||
|
(error #f "thread ~s read wrong value ~s" n b)))
|
||
|
(close-port ip))
|
||
|
(g (- k 1)))))
|
||
|
(with-mutex m
|
||
|
(set! nthreads (- nthreads 1))
|
||
|
(when (= nthreads 0) (condition-signal c)))))
|
||
|
(f (- n 1))))
|
||
|
(condition-wait c m))
|
||
|
(when saved-condition (raise saved-condition))
|
||
|
#t)
|
||
|
($thread-check)
|
||
|
(begin
|
||
|
(module ($tt-count $tt-done!)
|
||
|
(define n 0)
|
||
|
(define done-mutex (make-mutex))
|
||
|
(define $tt-count (lambda () n))
|
||
|
(define ($tt-done!)
|
||
|
(with-mutex done-mutex
|
||
|
(set! n (+ n 1)))))
|
||
|
(define $tt-spam (make-thread-parameter 'main))
|
||
|
(define $tt-run (make-thread-parameter void))
|
||
|
(define $tt-global-mutex (make-mutex))
|
||
|
(define $tt-fat
|
||
|
(make-thread-parameter
|
||
|
(let f ([n 100] [ls (oblist)])
|
||
|
(define (pick-rem ls)
|
||
|
(let f ([ls ls] [i (random (length ls))])
|
||
|
(if (fx= i 0)
|
||
|
(values (car ls) (cdr ls))
|
||
|
(let-values ([(x d) (f (cdr ls) (fx- i 1))])
|
||
|
(values x (cons (car ls) d))))))
|
||
|
(if (= n 0)
|
||
|
'()
|
||
|
(let-values ([(x ls) (pick-rem ls)])
|
||
|
(cons x (f (- n 1) ls)))))))
|
||
|
; original test used copy-environment, which we don't use because
|
||
|
; environments are never collected and we don't want the remaining
|
||
|
; mats to take forever to run because the heap is full of lots of
|
||
|
; unreclaimable junk. So we simulate some of the chores of copying
|
||
|
; an environment
|
||
|
(define ($tt-chew ls)
|
||
|
(let ([g* (map
|
||
|
(lambda (x)
|
||
|
(let ([g (gensym)])
|
||
|
(putprop g '*cte* (cons 'alias x))
|
||
|
(when (#%$top-level-bound? x)
|
||
|
(#%$set-top-level-value! g (#%$top-level-value x)))
|
||
|
g))
|
||
|
ls)])
|
||
|
(map (lambda (g) (cdr (getprop g '*cte*))) g*)))
|
||
|
(with-output-to-file "testfile.ss"
|
||
|
(lambda ()
|
||
|
(pretty-print
|
||
|
'($tt-run (lambda ()
|
||
|
(do ([i 2 (1- i)])
|
||
|
((= i 0))
|
||
|
(with-mutex $tt-global-mutex
|
||
|
(printf
|
||
|
"thread ~s iteration ~s\n"
|
||
|
($tt-spam)
|
||
|
i))
|
||
|
(with-mutex $tt-global-mutex
|
||
|
($tt-fat ($tt-chew ($tt-fat)))))))))
|
||
|
'replace)
|
||
|
(parameterize ([collect-request-handler
|
||
|
(lambda ()
|
||
|
(printf "~s collecting\n" ($tt-spam))
|
||
|
(collect))]
|
||
|
[collect-notify #t])
|
||
|
(time
|
||
|
(do ([i 15 (1- i)])
|
||
|
((= i 0))
|
||
|
(printf "Pass ~a~%" i)
|
||
|
(time
|
||
|
(let* ([thread-count 25]
|
||
|
#;[compile-mutex (make-mutex)]
|
||
|
[thread-finished-mutex (make-mutex)]
|
||
|
[thread-finished-count 0]
|
||
|
[program-finished-condition (make-condition)]
|
||
|
[console-mutex $tt-global-mutex]
|
||
|
[saved-condition #f])
|
||
|
(do ([i thread-count (1- i)])
|
||
|
((= i 0))
|
||
|
(mutex-acquire $tt-global-mutex)
|
||
|
(parameterize ([$tt-fat ($tt-chew ($tt-fat))])
|
||
|
(mutex-release $tt-global-mutex)
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(guard (c [else (set! saved-condition c)])
|
||
|
($tt-spam i)
|
||
|
(with-mutex console-mutex
|
||
|
(printf "Starting thread ~s...~%" i))
|
||
|
; mutex should no longer be needed as of v7.3
|
||
|
#;(with-mutex compile-mutex (load "testfile.ss"))
|
||
|
; let's see if we can load from source w/o a mutex
|
||
|
(load "testfile.ss")
|
||
|
(($tt-run))
|
||
|
; now let's see if we can compile and load object code
|
||
|
; w/o a mutex
|
||
|
(compile-file "testfile.ss" (format "testfile.~s" i))
|
||
|
(load (format "testfile.~s" i))
|
||
|
(($tt-run))
|
||
|
(with-mutex console-mutex
|
||
|
(printf "Finished ~s.~%" i)))
|
||
|
($tt-done!)
|
||
|
(with-mutex thread-finished-mutex
|
||
|
(set! thread-finished-count
|
||
|
(1+ thread-finished-count))
|
||
|
(when (= thread-finished-count
|
||
|
thread-count)
|
||
|
(condition-signal
|
||
|
program-finished-condition)))))))
|
||
|
(with-mutex thread-finished-mutex
|
||
|
(unless (= thread-finished-count thread-count)
|
||
|
(condition-wait
|
||
|
program-finished-condition
|
||
|
thread-finished-mutex)))
|
||
|
($tt-done!)
|
||
|
(when saved-condition (raise saved-condition)))))))
|
||
|
(eqv? ($tt-count) 390))
|
||
|
($thread-check)
|
||
|
; following tests garbage collection of code created and run by multiple
|
||
|
; threads. gc is done by one thread. will the others find their instruction
|
||
|
; and data caches properly synchronized?
|
||
|
(let ([thread-count 2] [iterations 10000])
|
||
|
(equal?
|
||
|
(parameterize ([collect-trip-bytes (expt 2 15)]
|
||
|
[collect-generation-radix 1])
|
||
|
(let ([out '()]
|
||
|
[out-mutex (make-mutex)]
|
||
|
[out-condition (make-condition)]
|
||
|
[error? #f])
|
||
|
(do ([i 0 (+ i 1)])
|
||
|
((= i thread-count))
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(guard (c [#t (with-mutex out-mutex (set! error? c))])
|
||
|
(let ([n (eval `(let ()
|
||
|
(define zero? null?)
|
||
|
(define 1+ list)
|
||
|
(define 1- car)
|
||
|
(define zero '())
|
||
|
(define (fatter n)
|
||
|
(if (= n 0)
|
||
|
zero
|
||
|
(1+ (fatter (- n 1)))))
|
||
|
(define (slimmer x)
|
||
|
(if (zero? x)
|
||
|
0
|
||
|
(+ (slimmer (1- x)) 1)))
|
||
|
(define fatter+
|
||
|
(lambda (x y)
|
||
|
(if (zero? y)
|
||
|
x
|
||
|
(fatter+ (1+ x) (1- y)))))
|
||
|
(define fatterfib
|
||
|
(lambda (x)
|
||
|
(if (or (zero? x) (zero? (1- x)))
|
||
|
(1+ zero)
|
||
|
(fatter+ (fatterfib (1- x)) (fatterfib (1- (1- x)))))))
|
||
|
(let loop ([n ,iterations] [a 0])
|
||
|
(if (= n 0)
|
||
|
a
|
||
|
(loop (- n 1) (+ a (slimmer (fatterfib (fatter 10)))))))))])
|
||
|
(with-mutex out-mutex
|
||
|
(set! out (cons n out))
|
||
|
(condition-signal out-condition)))))))
|
||
|
(let f ()
|
||
|
(printf "waiting for ~s more thread(s)\n" (- thread-count (length out)))
|
||
|
(with-mutex out-mutex
|
||
|
(unless (or error?
|
||
|
(= (length out) thread-count)
|
||
|
(= (length ($threads)) $nthreads))
|
||
|
(condition-wait out-condition out-mutex)))
|
||
|
(cond
|
||
|
[error? (raise error?)]
|
||
|
[(or (= (length out) thread-count)
|
||
|
(= (length ($threads)) $nthreads))
|
||
|
(printf "done\n")
|
||
|
out]
|
||
|
[else (f)]))))
|
||
|
(make-list thread-count (* 89 iterations))))
|
||
|
($thread-check)
|
||
|
; following tries to verify proper synchronization when top-level exports
|
||
|
; build up system property list. need a module with at least 21 exports to
|
||
|
; force syntax.ss to consult property list for imports
|
||
|
(let ([thread-count 20]
|
||
|
[iterations 100]
|
||
|
[syms '(a1 a2 a3 a4 a5 a6 a7 a8 a9 a10 a11 a12 a13
|
||
|
a14 a15 a16 a17 a18 a19 a20 a21 a22 a23 a24)])
|
||
|
(equal?
|
||
|
(let ([out '()] [out-mutex (make-mutex)] [error? #f])
|
||
|
(do ([i 0 (+ i 1)])
|
||
|
((= i thread-count))
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(guard (c [#t (set! error? c)])
|
||
|
(do ([j 0 (+ j 1)])
|
||
|
((= j iterations))
|
||
|
(eval `(letrec-syntax
|
||
|
([frob
|
||
|
(syntax-rules ()
|
||
|
[(_ m r) (frobh m r ,syms ,syms ())])]
|
||
|
[frobh
|
||
|
(syntax-rules ()
|
||
|
[(_ m r (s ...) () (q ...))
|
||
|
(module m (r q ...)
|
||
|
(define-syntax r
|
||
|
(identifier-syntax
|
||
|
(list ,(+ (* i thread-count) j) q ...)))
|
||
|
(define q 's)
|
||
|
...)]
|
||
|
[(_ m r (a ...) (p0 p ...) (q ...))
|
||
|
(frobh m r (a ...) (p ...) ($tx q ...))])])
|
||
|
(frob
|
||
|
,(string->symbol (format "m~s-~s" i j))
|
||
|
$tx)))
|
||
|
(let ([v (eval `(let ()
|
||
|
(import ,(string->symbol (format "m~s-~s" i j)))
|
||
|
$tx))])
|
||
|
(with-mutex out-mutex
|
||
|
(set! out (cons v out)))))))))
|
||
|
(let f ([n 0])
|
||
|
(cond
|
||
|
[error? (raise error?)]
|
||
|
[(or (= (length out) (* thread-count iterations))
|
||
|
(= (length ($threads)) $nthreads))
|
||
|
(sort (lambda (x y) (< (car x) (car y))) out)]
|
||
|
[else
|
||
|
(when (= (modulo n 100000) 0) (printf "waiting ~s ~s\n" n (length out)))
|
||
|
(f (+ n 1))])))
|
||
|
(sort (lambda (x y) (< (car x) (car y)))
|
||
|
(let f ([i 0])
|
||
|
(if (= i thread-count)
|
||
|
'()
|
||
|
(let g ([j 0])
|
||
|
(if (= j iterations)
|
||
|
(f (+ i 1))
|
||
|
(cons (cons (+ (* i thread-count) j) syms) (g (+ j 1))))))))))
|
||
|
($thread-check)
|
||
|
(let ([thread-count 20]
|
||
|
[iterations 100])
|
||
|
(equal?
|
||
|
(let ([out '()] [out-mutex (make-mutex)] [error? #f])
|
||
|
(do ([i 0 (+ i 1)])
|
||
|
((= i thread-count))
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(guard (c [#t (set! error? c)])
|
||
|
(do ([j 0 (+ j 1)])
|
||
|
((= j iterations))
|
||
|
(eval `(letrec-syntax
|
||
|
([frob
|
||
|
(syntax-rules ()
|
||
|
[(_ a)
|
||
|
(begin
|
||
|
(define-syntax $ty (identifier-syntax ,(+ (* i thread-count) j)))
|
||
|
(define-syntax a (identifier-syntax $ty)))])])
|
||
|
(frob ,(string->symbol (format "a~s-~s" i j)))))
|
||
|
(let ([v (eval (string->symbol (format "a~s-~s" i j)))])
|
||
|
(with-mutex out-mutex
|
||
|
(set! out (cons v out)))))))))
|
||
|
(let f ([n 0])
|
||
|
(cond
|
||
|
[error? (raise error?)]
|
||
|
[(or (= (length out) (* thread-count iterations))
|
||
|
(= (length ($threads)) $nthreads))
|
||
|
(sort < out)]
|
||
|
[else
|
||
|
(when (= (modulo n 100000) 0) (printf "waiting ~s ~s\n" n (length out)))
|
||
|
(f (+ n 1))])))
|
||
|
(sort <
|
||
|
(let f ([i 0])
|
||
|
(if (= i thread-count)
|
||
|
'()
|
||
|
(let g ([j 0])
|
||
|
(if (= j iterations)
|
||
|
(f (+ i 1))
|
||
|
(cons (+ (* i thread-count) j) (g (+ j 1))))))))))
|
||
|
($thread-check)
|
||
|
; this mat has some inherent starvation issues, with the main thread
|
||
|
; looping rather than waiting on a condition at initialization time and
|
||
|
; other threads looping rather than waiting on a condition when looking
|
||
|
; for work to steal. these looping threads can hog the cpu without
|
||
|
; doing anything useful, causing progress to stall or halt. this
|
||
|
; manifests as occasional indefinite delays under Windows and OpenBSD,
|
||
|
; and it has the potential to cause the same on other operating systems.
|
||
|
; it's not clear how to fix the mat without changing it fundamentally.
|
||
|
#;(eqv?
|
||
|
(let () ; from Ryan Newton
|
||
|
|
||
|
(define-syntax ASSERT
|
||
|
(lambda (x)
|
||
|
(syntax-case x ()
|
||
|
[(_ expr) #'(or expr (errorf 'ASSERT "failed: ~s" (IFCHEZ #'expr (format-syntax #'expr))))]
|
||
|
;; This form is (ASSERT integer? x) returning the value of x.
|
||
|
[(_ fun val) #'(let ([v val])
|
||
|
(if (fun v) v
|
||
|
(errorf 'ASSERT "failed: ~s\n Value which did not satisfy above predicate: ~s"
|
||
|
(IFCHEZ #'fun (format-syntax #'fun))
|
||
|
v)))]
|
||
|
)))
|
||
|
|
||
|
(define test-depth 25) ;; Make a tree with 2^test-depth nodes.
|
||
|
|
||
|
(define vector-build
|
||
|
(lambda (n f)
|
||
|
(let ([v (make-vector n)])
|
||
|
(do ([i 0 (fx+ i 1)])
|
||
|
((= i n) v)
|
||
|
(vector-set! v i (f i))
|
||
|
))))
|
||
|
|
||
|
;; STATE:
|
||
|
|
||
|
;; Each thread's stack has a list of frames, from newest to oldest.
|
||
|
;; We use a lock-free approach for mutating/reading the frame list.
|
||
|
;; Therefore, a thief might steal an old inactive frame, but this poses no problem.
|
||
|
;;
|
||
|
;; A thread's "stack" must be as efficient as possible, because
|
||
|
;; it essentially replaces the native scheme stack where par calls
|
||
|
;; are concerned. (I wonder if continuations can serve any purpose here.)
|
||
|
;; Note, head is the "bottom" and tail is the "top". We add to tail.
|
||
|
(define-record shadowstack (id head tail frames))
|
||
|
|
||
|
;; Frames are locked individually.
|
||
|
;; status may be 'available, 'stolen, or 'done
|
||
|
(define-record shadowframe (mut status oper argval))
|
||
|
|
||
|
;; There's also a global list of threads:
|
||
|
(define allstacks '#()) ;; This is effectively immutable.
|
||
|
(define par-finished #f)
|
||
|
;; And a mutex for global state:
|
||
|
(define global-mut (make-mutex))
|
||
|
(define threads-registered 1)
|
||
|
|
||
|
;; A new stack has no frames, but has a (hopefully) unique ID:
|
||
|
(define (new-stack)
|
||
|
(make-shadowstack (random 10000)
|
||
|
0 ;; Head pointer.
|
||
|
0 ;; Tail pointer.
|
||
|
(vector-build 50
|
||
|
(lambda (_) (make-shadowframe (make-mutex) #f #f #f)))))
|
||
|
|
||
|
;; A per-thread parameter.
|
||
|
(define this-stack (make-thread-parameter (new-stack)))
|
||
|
|
||
|
;; Mutated below:
|
||
|
(define numprocessors #f)
|
||
|
|
||
|
;; We spin until everybody is awake.
|
||
|
(define (wait-for-everybody)
|
||
|
(let wait-for-threads ([n 0])
|
||
|
; attempt to prevent apparent occasional starvation on openbsd
|
||
|
(when (= n 0) (printf " ~s ~s\n" threads-registered numprocessors))
|
||
|
(unless (= threads-registered numprocessors)
|
||
|
(wait-for-threads (mod n 1000)))))
|
||
|
|
||
|
;; DEBUGGING:
|
||
|
;; Pick a print:
|
||
|
; (define (print . args) (with-mutex global-mut (apply printf args) (flush-output-port)))
|
||
|
; (define (print . args) (apply printf args))
|
||
|
(define (print . args) (void)) ;; fizzle
|
||
|
|
||
|
|
||
|
;; ----------------------------------------
|
||
|
|
||
|
(define (init-par num-cpus)
|
||
|
(printf "\n Initializing PAR system for ~s threads.\n" num-cpus)
|
||
|
(with-mutex global-mut
|
||
|
(ASSERT (eq? threads-registered 1))
|
||
|
(set! numprocessors num-cpus)
|
||
|
(set! allstacks (make-vector num-cpus))
|
||
|
(vector-set! allstacks 0 (this-stack))
|
||
|
;; We fork N-1 threads (the original one counts)
|
||
|
(do ([i 1 (fx+ i 1)]) ([= i num-cpus] (void))
|
||
|
(vector-set! allstacks i (make-worker))))
|
||
|
(wait-for-everybody)
|
||
|
(printf "Everyone's awake!\n"))
|
||
|
|
||
|
(define (shutdown-par) (set! par-finished #t))
|
||
|
(define (par-status)
|
||
|
(printf "Par status:\n par-finished ~s\n allstacks: ~s\n stacksizes: ~s\n\n"
|
||
|
par-finished (vector-length allstacks)
|
||
|
(map shadowstack-tail (vector->list allstacks))))
|
||
|
|
||
|
;; ----------------------------------------
|
||
|
|
||
|
;; Try to do work and mark it as done.
|
||
|
(define (steal-work! frame)
|
||
|
(and (eq? 'available (shadowframe-status frame))
|
||
|
(mutex-acquire (shadowframe-mut frame) #f) ;; Don't block on it
|
||
|
;; From here on out we've got the mutex:
|
||
|
(if (eq? 'available (shadowframe-status frame)) ;; If someone beat us here, we fizzle
|
||
|
#t
|
||
|
(begin (mutex-release (shadowframe-mut frame))
|
||
|
#f))
|
||
|
(begin
|
||
|
;;(printf "STOLE work! ~s\n" frame)
|
||
|
(set-shadowframe-status! frame 'stolen)
|
||
|
(mutex-release (shadowframe-mut frame))
|
||
|
;; Then let go to do the real work:
|
||
|
(set-shadowframe-argval! frame
|
||
|
((shadowframe-oper frame) (shadowframe-argval frame)))
|
||
|
(set-shadowframe-status! frame 'done)
|
||
|
#t)))
|
||
|
|
||
|
(define (find-and-steal-once!)
|
||
|
(let* ([ind (random numprocessors)]
|
||
|
[stack (vector-ref allstacks ind)])
|
||
|
(let* ([frames (shadowstack-frames stack)]
|
||
|
[tl (shadowstack-tail stack)])
|
||
|
(let frmloop ([i 0])
|
||
|
(if (fx= i tl)
|
||
|
#f ;; No work on this processor, try again.
|
||
|
(if (steal-work! (vector-ref frames i))
|
||
|
#t
|
||
|
(frmloop (fx+ 1 i))))))))
|
||
|
|
||
|
(define (make-worker)
|
||
|
(define stack (new-stack))
|
||
|
(fork-thread (lambda ()
|
||
|
(this-stack stack) ;; Initialize stack.
|
||
|
(with-mutex global-mut ;; Register our existence.
|
||
|
(set! threads-registered (add1 threads-registered)))
|
||
|
;; Steal work forever:
|
||
|
(let forever ()
|
||
|
(unless par-finished
|
||
|
(find-and-steal-once!)
|
||
|
(forever)))))
|
||
|
stack)
|
||
|
|
||
|
(define-syntax pcall
|
||
|
(syntax-rules ()
|
||
|
[(_ op (f x) e2)
|
||
|
(let ([stack (this-stack)])
|
||
|
(define (push! oper val)
|
||
|
(let ([frame (vector-ref (shadowstack-frames stack) (shadowstack-tail stack))])
|
||
|
;; Initialize the frame
|
||
|
(set-shadowframe-oper! frame oper)
|
||
|
(set-shadowframe-argval! frame val)
|
||
|
(set-shadowframe-status! frame 'available)
|
||
|
(set-shadowstack-tail! stack (fx+ (shadowstack-tail stack) 1))
|
||
|
frame))
|
||
|
(define (pop!) (set-shadowstack-tail! stack (fx- (shadowstack-tail stack) 1)))
|
||
|
|
||
|
(let ([frame (push! f x)])
|
||
|
(let ([val1 e2])
|
||
|
;; We're the parent, when we get to this frame, we lock it off from all other comers.
|
||
|
;; Thieves should do non-blocking probes.
|
||
|
(let waitloop ()
|
||
|
(mutex-acquire (shadowframe-mut frame))
|
||
|
(case (shadowframe-status frame)
|
||
|
[(available)
|
||
|
(set-shadowframe-status! frame 'stolen) ;; Just in case...
|
||
|
(pop!) ;; Pop before we even start the thunk.
|
||
|
(mutex-release (shadowframe-mut frame))
|
||
|
(op ((shadowframe-oper frame) (shadowframe-argval frame))
|
||
|
val1)]
|
||
|
;; Oops, they may be waiting to get back in here and set the result, let's get out quick.
|
||
|
[(stolen)
|
||
|
;; Let go of this so they can finish and mark it as done.
|
||
|
(mutex-release (shadowframe-mut frame))
|
||
|
(find-and-steal-once!) ;; Meanwhile we should go try to make ourselves useful..
|
||
|
(waitloop)] ;; When we're done with that come back and see if our outsourced job is done.
|
||
|
;; It was stolen and is now completed:
|
||
|
[else (pop!)
|
||
|
(mutex-release (shadowframe-mut frame))
|
||
|
(op (shadowframe-argval frame) val1)]))
|
||
|
|
||
|
)))]))
|
||
|
|
||
|
;; Returns values in a list
|
||
|
(define-syntax par
|
||
|
(syntax-rules ()
|
||
|
[(_ a b) (pcall list ((lambda (_) a) #f) b)]))
|
||
|
|
||
|
(define-syntax parmv
|
||
|
(syntax-rules ()
|
||
|
[(_ a b) (pcall values ((lambda (_) a) #f) b)]))
|
||
|
|
||
|
;;================================================================================
|
||
|
|
||
|
(init-par 4)
|
||
|
|
||
|
(let ()
|
||
|
(define (tree n)
|
||
|
(if (fxzero? n) 1
|
||
|
(pcall fx+ (tree (fx- n 1)) (tree (fx- n 1)))))
|
||
|
(let ([n (time (tree test-depth))])
|
||
|
(shutdown-par)
|
||
|
n)))
|
||
|
33554432)
|
||
|
($thread-check)
|
||
|
; make sure thread can return zero values
|
||
|
(eqv?
|
||
|
(let ([x #f])
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(call/cc
|
||
|
(lambda (k)
|
||
|
(set! x 'frob)
|
||
|
(k)
|
||
|
(set! x 'borf)))))
|
||
|
(let f () (unless x ($yield) (f)))
|
||
|
x)
|
||
|
'frob)
|
||
|
($thread-check)
|
||
|
(equal?
|
||
|
(let ()
|
||
|
(define n 10)
|
||
|
(define m (make-mutex))
|
||
|
(define c (make-condition))
|
||
|
(define-ftype A (struct (x double) (y iptr)))
|
||
|
(define a (make-ftype-pointer A (foreign-alloc (ftype-sizeof A))))
|
||
|
(define ls '())
|
||
|
(ftype-set! A (x) a 3.4)
|
||
|
(ftype-set! A (y) a 1)
|
||
|
(with-mutex m
|
||
|
(do ([n n (fx- n 1)])
|
||
|
((fx= n 0))
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(define fat+
|
||
|
(lambda (x y)
|
||
|
(if (zero? y)
|
||
|
x
|
||
|
(fat+ (1+ x) (1- y)))))
|
||
|
(define fatfib
|
||
|
(lambda (x)
|
||
|
(if (< x 2)
|
||
|
1
|
||
|
(fat+ (fatfib (1- x)) (fatfib (1- (1- x)))))))
|
||
|
(do ([i (fx+ 1000 n) (fx- i 1)] [q (fatfib 4) (fatfib 4)])
|
||
|
((fx= i 0)
|
||
|
(with-mutex m
|
||
|
(set! ls (cons q ls))
|
||
|
(condition-signal c)))
|
||
|
(if (odd? n)
|
||
|
(ftype-locked-incr! A (y) a)
|
||
|
(when (ftype-locked-decr! A (y) a)
|
||
|
(printf "woohoo!\n")))))))
|
||
|
(let loop ()
|
||
|
(if (equal? (length ls) n)
|
||
|
(list (ftype-ref A (x) a) (ftype-ref A (y) a))
|
||
|
(begin
|
||
|
(condition-wait c m)
|
||
|
(loop))))))
|
||
|
'(3.4 -4))
|
||
|
($thread-check)
|
||
|
(begin
|
||
|
(load-shared-object (format "~a/foreign1.so" *mats-dir*))
|
||
|
#t)
|
||
|
(equal?
|
||
|
(let ()
|
||
|
(define ff-locked-incr! (foreign-procedure "locked_incr" ((* iptr)) boolean))
|
||
|
(define ff-locked-decr! (foreign-procedure "locked_decr" ((* iptr)) boolean))
|
||
|
(define n 10)
|
||
|
(define m (make-mutex))
|
||
|
(define c (make-condition))
|
||
|
(define-ftype A (struct (x double) (y iptr)))
|
||
|
(define a (make-ftype-pointer A (foreign-alloc (ftype-sizeof A))))
|
||
|
(define ls '())
|
||
|
(ftype-set! A (x) a 3.4)
|
||
|
(ftype-set! A (y) a 1)
|
||
|
(with-mutex m
|
||
|
(do ([n n (fx- n 1)])
|
||
|
((fx= n 0))
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(define fat+
|
||
|
(lambda (x y)
|
||
|
(if (zero? y)
|
||
|
x
|
||
|
(fat+ (1+ x) (1- y)))))
|
||
|
(define fatfib
|
||
|
(lambda (x)
|
||
|
(if (< x 2)
|
||
|
1
|
||
|
(fat+ (fatfib (1- x)) (fatfib (1- (1- x)))))))
|
||
|
(do ([i (fx+ 1000 n) (fx- i 1)] [q (fatfib 10) (fatfib 10)])
|
||
|
((fx= i 0)
|
||
|
(with-mutex m
|
||
|
(set! ls (cons q ls))
|
||
|
(condition-signal c)))
|
||
|
(if (odd? n)
|
||
|
(ff-locked-incr! (ftype-&ref A (y) a))
|
||
|
(when (ff-locked-decr! (ftype-&ref A (y) a))
|
||
|
(printf "hoowoo!\n")))))))
|
||
|
(let loop ()
|
||
|
(if (equal? (length ls) n)
|
||
|
(list (ftype-ref A (x) a) (ftype-ref A (y) a))
|
||
|
(begin
|
||
|
(condition-wait c m)
|
||
|
(loop))))))
|
||
|
'(3.4 -4))
|
||
|
($thread-check)
|
||
|
(eqv?
|
||
|
(let ()
|
||
|
(define n 10)
|
||
|
(define m (make-mutex))
|
||
|
(define c (make-condition))
|
||
|
(define-ftype A (struct (x double) (y iptr)))
|
||
|
(define a (make-ftype-pointer A (foreign-alloc (ftype-sizeof A))))
|
||
|
(define ls '())
|
||
|
(ftype-set! A (x) a 3.5)
|
||
|
(ftype-init-lock! A (y) a)
|
||
|
(with-mutex m
|
||
|
(do ([n n (fx- n 1)])
|
||
|
((fx= n 0))
|
||
|
(fork-thread
|
||
|
(lambda ()
|
||
|
(define fat+
|
||
|
(lambda (x y)
|
||
|
(if (zero? y)
|
||
|
x
|
||
|
(fat+ (1+ x) (1- y)))))
|
||
|
(define fatfib
|
||
|
(lambda (x)
|
||
|
(if (< x 2)
|
||
|
1
|
||
|
(fat+ (fatfib (1- x)) (fatfib (1- (1- x)))))))
|
||
|
(do ([i (+ 1000 n) (fx- i 1)] [q (fatfib 4) (fatfib 4)])
|
||
|
((fx= i 0)
|
||
|
(with-mutex m
|
||
|
(set! ls (cons q ls))
|
||
|
(condition-signal c)))
|
||
|
; disable collections so we don't rendezvous for collection while holding the lock,
|
||
|
; leaving some other thread spinning hopelessly in ftype-spin-lock!
|
||
|
(with-interrupts-disabled
|
||
|
(if (odd? i)
|
||
|
(let loop () (unless (ftype-lock! A (y) a) (printf "waiting\n") (loop)))
|
||
|
(ftype-spin-lock! A (y) a))
|
||
|
(ftype-set! A (x) a ((if (odd? n) + -) (ftype-ref A (x) a) 1.0))
|
||
|
(ftype-unlock! A (y) a))))))
|
||
|
(let loop ()
|
||
|
(if (equal? (length ls) n)
|
||
|
(ftype-ref A (x) a)
|
||
|
(begin
|
||
|
(condition-wait c m)
|
||
|
(loop))))))
|
||
|
-1.5)
|
||
|
($thread-check)
|
||
|
(parameterize ([collect-request-handler void])
|
||
|
(define (expect-error what thunk)
|
||
|
(guard (c [(and (message-condition? c)
|
||
|
(equal? (condition-message c)
|
||
|
(format "~a is defunct" what)))])
|
||
|
(thunk)
|
||
|
(error #f "error expected")))
|
||
|
(let ([g (make-guardian)])
|
||
|
(g (make-mutex))
|
||
|
(collect)
|
||
|
(let ([m (g)])
|
||
|
(expect-error 'mutex (lambda () (mutex-acquire m)))
|
||
|
(expect-error 'mutex (lambda () (mutex-release m)))
|
||
|
(expect-error 'mutex (lambda () (condition-wait (make-condition) m))))
|
||
|
(g (make-condition))
|
||
|
(collect)
|
||
|
(let ([c (g)])
|
||
|
(expect-error 'condition (lambda () (condition-wait c (make-mutex))))
|
||
|
(expect-error 'condition (lambda () (condition-broadcast c)))
|
||
|
(expect-error 'condition (lambda () (condition-signal c))))))
|
||
|
)
|
||
|
|
||
|
(mat make-thread-parameter
|
||
|
(begin (define p (make-thread-parameter #f not)) #t)
|
||
|
(p)
|
||
|
(begin (p #f) (p))
|
||
|
(begin (p #t) (not (p)))
|
||
|
(begin (define q (make-thread-parameter #t)) #t)
|
||
|
(q)
|
||
|
(begin (q #f) (not (q)))
|
||
|
(begin (q #t) (q))
|
||
|
(or (= (optimize-level) 3)
|
||
|
(guard (c [(and (message-condition? c)
|
||
|
(equal? (condition-message c) "~s is not a procedure")
|
||
|
(irritants-condition? c)
|
||
|
(equal? (condition-irritants c) (list 2)))])
|
||
|
(make-thread-parameter 1 2)))
|
||
|
(begin
|
||
|
(define p
|
||
|
(make-thread-parameter 5
|
||
|
(lambda (x) (+ x 1))))
|
||
|
#t)
|
||
|
(eqv? (p) 6)
|
||
|
(or (= (optimize-level) 3)
|
||
|
(guard (c [(and (message-condition? c)
|
||
|
(equal? (condition-message c) "~s is not a number")
|
||
|
(irritants-condition? c)
|
||
|
(equal? (condition-irritants c) (list 'a)))])
|
||
|
(p 'a)))
|
||
|
)
|
||
|
|
||
|
(mat cas
|
||
|
(begin
|
||
|
(define (check container container-ref container-cas!)
|
||
|
(let ([N 1000]
|
||
|
[M 4]
|
||
|
[done 0]
|
||
|
[m (make-mutex)]
|
||
|
[c (make-condition)])
|
||
|
(define (bump)
|
||
|
(let loop ([i 0])
|
||
|
(unless (= i N)
|
||
|
(let ([v (container-ref container)])
|
||
|
(if (container-cas! container v (add1 v))
|
||
|
(loop (add1 i))
|
||
|
(loop i)))))
|
||
|
(mutex-acquire m)
|
||
|
(set! done (add1 done))
|
||
|
(condition-signal c)
|
||
|
(mutex-release m))
|
||
|
(let loop ([j 0])
|
||
|
(when (< j M)
|
||
|
(fork-thread bump)
|
||
|
(loop (add1 j))))
|
||
|
(mutex-acquire m)
|
||
|
(let loop ()
|
||
|
(cond
|
||
|
[(= done M)
|
||
|
(mutex-release m)]
|
||
|
[else
|
||
|
(condition-wait c m)
|
||
|
(loop)]))
|
||
|
(= (container-ref container) (* M N))))
|
||
|
(check (box 0) unbox box-cas!))
|
||
|
(check (vector 1 0 2) (lambda (v) (vector-ref v 1)) (lambda (v o n) (vector-cas! v 1 o n))))
|
||
|
; give the bump threads 0.2 s to finish
|
||
|
(sleep (make-time 'time-duration 200000000 0))
|
||
|
)
|