;;; thread.ms ;;; Copyright 1984-2017 Cisco Systems, Inc. ;;; ;;; Licensed under the Apache License, Version 2.0 (the "License"); ;;; you may not use this file except in compliance with the License. ;;; You may obtain a copy of the License at ;;; ;;; http://www.apache.org/licenses/LICENSE-2.0 ;;; ;;; Unless required by applicable law or agreed to in writing, software ;;; distributed under the License is distributed on an "AS IS" BASIS, ;;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ;;; See the License for the specific language governing permissions and ;;; limitations under the License. ; why can't this come after misc.ms? (define-syntax when-threaded (lambda (x) (syntax-case x () [(_ e ...) (if (threaded?) #'(begin (void) e ...) #'(void))]))) (mat engine-thread (let () (define fattercode '(module (fatterfib fatter slimmer zero) (define zero? null?) (define 1+ list) (define 1- car) (define zero '()) (define (fatter n) (if (= n 0) zero (1+ (fatter (- n 1))))) (define (slimmer x) (if (zero? x) 0 (+ (slimmer (1- x)) 1))) (define fatter+ (lambda (x y) (if (zero? y) x (fatter+ (1+ x) (1- y))))) (define fatterfib (lambda (x) (if (or (zero? x) (zero? (1- x))) (1+ zero) (fatter+ (fatterfib (1- x)) (fatterfib (1- (1- x))))))))) (define fatter-eval (lambda (eval loc) (lambda () (let f ([n 10] [a 0]) (if (= n 0) (set-car! loc a) (f (- n 1) (+ a (eval `(let () ,fattercode (slimmer (fatterfib (fatter 10)))))))))))) (define (engine-eval-test n) (define thread-list '()) (define fork-thread (lambda (x) (set! thread-list (cons (make-engine x) thread-list)))) (define locs (map list (make-list n 0))) (for-each (lambda (x) (fork-thread (fatter-eval interpret x))) locs) (let f ([q thread-list]) (unless (null? q) ((car q) 10 (lambda (t . r) (f (cdr q))) (lambda (x) (f (append (cdr q) (list x))))))) (map car locs)) (equal? (engine-eval-test 7) '(890 890 890 890 890 890 890))) ) (when-threaded (mat thread (let ([m (make-mutex)] [c (make-condition)] [m2 (make-mutex 'mname)] [c2 (make-condition 'cname)]) (and (mutex? m) (thread-condition? c) (mutex? m2) (thread-condition? c2) (not (mutex? c)) (not (thread-condition? m)) (not (mutex? c2)) (not (thread-condition? m2)) (not (mutex-name m)) (not (condition-name c)) (eq? 'mname (mutex-name m2)) (eq? 'cname (condition-name c2)) (not (mutex? 'mutex)) (not (thread-condition? 'condition)))) (begin (define $fib (lambda (x) (if (< x 2) 1 (+ ($fib (- x 1)) ($fib (- x 2)))))) (let-syntax ([mats-dir-relative (lambda (x) (syntax-case x () [(_ (include ?path)) (string? (datum ?path)) #`(include #,(format "~a/~a" *mats-dir* (datum ?path)))]))]) (mats-dir-relative (include "../../mats/thread-check.ss"))) (define $time-in-range? (lambda (start stop target) (let ([t (time-difference stop start)]) (<= (abs (- (+ (time-second t) (* (time-nanosecond t) 1e-9)) target)) 0.2)))) (andmap procedure? (list $threads $fib $thread-check $time-in-range?))) ($thread-check) (not (= (let ([n #f]) (fork-thread (lambda () (set! n (get-thread-id)))) (let f () (if n n (begin ($yield) (f))))) 0)) ($thread-check) (= (let ([n #f]) (fork-thread (lambda () (set! n (get-process-id)))) (let f () (if n n (begin ($yield) (f))))) (get-process-id)) ($thread-check) (equal? (let () (define fat+ (lambda (x y) (if (zero? y) x (fat+ (1+ x) (1- y))))) (define fatfib (lambda (x) (if (< x 2) 1 (fat+ (fatfib (1- x)) (fatfib (1- (1- x))))))) (define iota (case-lambda [(n) (iota 0 n)] [(i n) (if (= n 0) '() (cons i (iota (+ i 1) (- n 1))))])) (define-syntax parallel-list (syntax-rules () [(_ x ...) (let ([v (make-vector (length '(x ...)) #f)] [m (make-mutex)] [c (make-condition)] [n (length '(x ...))]) (map (lambda (p i) (p i)) (list (lambda (i) (fork-thread (lambda () (vector-set! v i x) (with-mutex m (set! n (- n 1)) (when (= n 0) (condition-signal c)))))) ...) (iota (length '(x ...)))) (and (with-mutex m (condition-wait c m (make-time 'time-duration 0 60))) (vector->list v)))])) (parallel-list (fatfib 26) (fatfib 27) (fatfib 28) (fatfib 29) (fatfib 30) (fatfib 31))) '(196418 317811 514229 832040 1346269 2178309)) ($thread-check) (let ([m (make-mutex)] [c (make-condition)]) (with-mutex m (let* ([start (current-time)] [r (condition-wait c m (make-time 'time-duration 250000000 1))] [stop (current-time)]) (and (not r) ($time-in-range? start stop 1.25))))) (let ([m (make-mutex)] [c (make-condition)]) (with-mutex m (let* ([start (current-time)] [r (condition-wait c m (add-duration start (make-time 'time-duration 250000000 1)))] [stop (current-time)]) (and (not r) ($time-in-range? start stop 1.25))))) (let ([m (make-mutex)] [c (make-condition)]) (with-mutex m (let* ([start (current-time)] [r (condition-wait c m (make-time 'time-duration 0 -1))] [stop (current-time)]) (and (not r) ($time-in-range? start stop 0.0))))) (let ([m (make-mutex)] [c (make-condition)]) (with-mutex m (let* ([start (current-time)] [r (condition-wait c m (add-duration start (make-time 'time-duration 0 -1)))] [stop (current-time)]) (and (not r) ($time-in-range? start stop 0.0))))) (let ([m (make-mutex)] [c (make-condition)]) (with-mutex m (fork-thread (lambda () (with-mutex m (sleep (make-time 'time-duration 250000000 0))))) (let* ([start (current-time)] [r (condition-wait c m (make-time 'time-duration 100000000 0))] [stop (current-time)]) (and (not r) ($time-in-range? start stop 0.25))))) (let ([count 300] [live 0] [live-m (make-mutex)]) (parameterize ([collect-request-handler (lambda () (set! count (- count 1)) (collect))]) (define chew (lambda (x) (when (> count 0) (chew (list 0))))) (define (gc-test n) (set! live n) (do ([i 0 (+ i 1)]) ((= i n)) (fork-thread (lambda () (chew 0) (with-mutex live-m (set! live (- live 1))))))) (gc-test 4) (chew 0)) ; wait for the others to die (let f () (unless (= live 0) ($yield) (f))) #t) ($thread-check) (let ([count 300] [live 0] [live-m (make-mutex)]) (parameterize ([collect-request-handler (lambda () (set! count (- count 1)) (collect))]) (define chew (lambda (x) (when (> count 0) (chew (list 0))))) (define (gc-test n) (set! live n) (do ([i 0 (+ i 1)]) ((= i n)) (fork-thread (lambda () (chew 0) (with-mutex live-m (set! live (- live 1))))))) (let ([m1 (make-mutex)] [m2 (make-mutex)] [c1 (make-condition)] [c2 (make-condition)]) ; suspend one thread on a condition, waiting for it to get there (with-mutex m1 (fork-thread (lambda () (with-mutex m1 (condition-signal c1) (condition-wait c1 m1) (condition-signal c1)))) (condition-wait c1 m1)) ; try to get another suspended on a mutex (mutex-acquire m2) (fork-thread (lambda () (with-mutex m2 (condition-signal c2)))) ; start some threads a-allocating (gc-test 3) ; join in the fun (chew 0) ; release the suspended threads (with-mutex m1 (condition-signal c1) (condition-wait c1 m1)) (condition-wait c2 m2) (mutex-release m2))) ; wait for the others to die (let f () (unless (= live 0) ($yield) (f))) #t) ($thread-check) (let ([count 300] [live 0] [live-m (make-mutex)]) (define fat+ (lambda (x y) (if (zero? y) x (fat+ (1+ x) (1- y))))) (define fatfib (lambda (x) (if (< x 2) 1 (fat+ (fatfib (1- x)) (fatfib (1- (1- x))))))) (parameterize ([collect-request-handler (lambda () (set! count (- count 1)) (collect))]) (define chew (lambda (x) (when (> count 0) (chew (list 0))))) (define (gc-test n) (set! live n) (do ([i 0 (+ i 1)]) ((= i n)) (fork-thread (lambda () (chew 0) (with-mutex live-m (set! live (- live 1))))))) (let ([m1 (make-mutex)] [m2 (make-mutex)] [c1 (make-condition)] [c2 (make-condition)]) ; suspend one thread on a condition, waiting for it to get there (with-mutex m1 (fork-thread (lambda () (with-mutex m1 (condition-signal c1) (condition-wait c1 m1) (condition-signal c1)))) (condition-wait c1 m1)) ; try to get another suspended on a mutex (mutex-acquire m2) (fork-thread (lambda () (with-mutex m2 (condition-signal c2)))) ; start some threads a-allocating (gc-test 5) ; create threads that die quickly while waiting for ; our count collections to occur (let ([live 0] [live-m (make-mutex)]) (let f () (when (> count 0) (fatfib 20) (with-mutex live-m ; cap the number of quickly dying threads, or ti3nb ; runs out of memory somewhere later in the mat run. ; theory is that virtual address space gets chopped ; up and that the main thread can't access virtual ; memory previously assigned to a child thread. (if (< live 20) ; don't panic if we try to create too many... (guard (c [#t (values)]) (fork-thread (lambda () (with-mutex live-m (set! live (- live 1))))) (set! live (+ live 1))))) (f)))) ; release the suspended threads (with-mutex m1 (condition-signal c1) (condition-wait c1 m1)) (condition-wait c2 m2) (mutex-release m2))) ; wait for the others to die (let f () (unless (= live 0) ($yield) (f))) #t) ($thread-check) (let ([count 30]) (parameterize ([collect-request-handler (lambda () (set! count (- count 1)) (collect))]) (define chew (lambda (x) (when (> count 0) (chew (list 0))))) (define (gc-test4 n) (define fib (lambda (x) (if (< x 2) 1 (+ (fib (- x 1)) (fib (- x 2)))))) ; stall collection for a bit (disable-interrupts) (do ([i 0 (+ i 1)]) ((= i n)) (fork-thread (lambda () (enable-interrupts) (chew 0)))) (fib 35) (enable-interrupts)) (gc-test4 4) (chew 0) #t)) ($thread-check) (eqv? (let () (define m (make-mutex)) (define fib (lambda (n) (if (with-mutex m (< n 2)) 1 (+ (fib (- n 1)) (fib (- n 2)))))) (fib 20)) 10946) ($thread-check) (let () (define m (make-mutex)) (define mgrab (lambda (n) (unless (fxzero? n) (mutex-acquire m) (mutex-release m) (mgrab (fx- n 1))))) (mgrab 100) #t) ($thread-check) (let () (define m (make-mutex)) (define recmgrab (lambda (n) (set! m (make-mutex)) (let f ([n n]) (unless (fxzero? n) (mutex-acquire m) (f (fx- n 1)) (mutex-release m))) (fork-thread (lambda () (mutex-acquire m) #;(pretty-print 'bye))))) (recmgrab 100) #t) ($thread-check) (eqv? (let ([cnt 0] [ans 0]) (define m (make-mutex)) (mutex-acquire m) (fork-thread (rec f (lambda () (with-mutex m (set! cnt (+ cnt 1))) (if (= ans 0) (begin ($yield) (f)) (set! ans (- ans)))))) (mutex-release m) (let f () (when (= cnt 0) (begin ($yield) (f)))) (set! ans 17) (let f () (if (< ans 0) ans (begin ($yield) (f))))) -17) ($thread-check) (eqv? (let () (define ans 0) (define cnt 0) (define m (make-mutex)) (define c (make-condition)) (define (f n) (fork-thread (lambda () (with-mutex m (set! ans (+ ans n)) (set! cnt (+ cnt 1)) (condition-wait c m) (set! cnt (- cnt 1)))))) (f 1) (f 2) (f 4) (let f () (unless (= cnt 3) (begin ($yield) (f)))) (with-mutex m (condition-broadcast c)) (let f () (unless (= cnt 0) (begin ($yield) (f)))) ans) 7) ($thread-check) (let ([nthreads (length ($threads))]) (define-record bounded-queue (i) ([vec (make-vector i)] [mutex (make-mutex)] [ready (make-condition)] [room (make-condition)] [waiting 0] [die? #f])) (define queue-empty? (lambda (bq) (with-mutex (bounded-queue-mutex bq) (= (bounded-queue-i bq) (vector-length (bounded-queue-vec bq)))))) (define enqueue! (lambda (item bq) (let ([mutex (bounded-queue-mutex bq)]) (with-mutex mutex (let loop () (unless (bounded-queue-die? bq) (if (zero? (bounded-queue-i bq)) (begin (condition-wait (bounded-queue-room bq) mutex) ; we grab the mutex when we wake up, but some other thread may ; beat us to the punch (loop)) (let ([i (- (bounded-queue-i bq) 1)]) (vector-set! (bounded-queue-vec bq) i item) (set-bounded-queue-i! bq i) (unless (zero? (bounded-queue-waiting bq)) (condition-signal (bounded-queue-ready bq))))))))))) (define dequeue! (lambda (bq) (let ([mutex (bounded-queue-mutex bq)]) (with-mutex mutex (let loop () (unless (bounded-queue-die? bq) (if (= (bounded-queue-i bq) (vector-length (bounded-queue-vec bq))) (begin (set-bounded-queue-waiting! bq (+ (bounded-queue-waiting bq) 1)) (condition-wait (bounded-queue-ready bq) mutex) (set-bounded-queue-waiting! bq (- (bounded-queue-waiting bq) 1)) ; we grab the mutex when we wake up, but some other thread may ; beat us to the punch (loop)) (let ([i (bounded-queue-i bq)]) (let ([item (vector-ref (bounded-queue-vec bq) i)]) (set-bounded-queue-i! bq (+ i 1)) (condition-signal (bounded-queue-room bq)) item))))))))) (define job-queue) (define fib (lambda (n) (if (< n 2) n (+ (fib (- n 2)) (fib (- n 1)))))) (define make-job (let ([count 0]) (lambda (n) (set! count (+ count 1)) (printf "Adding job #~s = (lambda () (fib ~s))\n" count n) (cons count (lambda () (fib n)))))) (define make-producer (lambda (n) (rec producer (lambda () (printf "producer ~s posting a job\n" n) (enqueue! (make-job (+ 20 (random 10))) job-queue) (if (bounded-queue-die? job-queue) (printf "producer ~s dying\n" n) (producer)))))) (define make-consumer (lambda (n) (rec consumer (lambda () (printf "consumer ~s looking for a job~%" n) (let ([job (dequeue! job-queue)]) (if (bounded-queue-die? job-queue) (printf "consumer ~s dying\n" n) (begin (printf "consumer ~s executing job #~s~%" n (car job)) (printf "consumer ~s computed: ~s~%" n ((cdr job))) (consumer)))))))) (define (bq-test np nc) (set! job-queue (make-bounded-queue (max nc np))) (do ([np np (- np 1)]) ((<= np 0)) (fork-thread (make-producer np))) (do ([nc nc (- nc 1)]) ((<= nc 0)) (fork-thread (make-consumer nc)))) (bq-test 3 4) (set! ans (fib 35)) ; flush out the waiting producers and consumers (set-bounded-queue-die?! job-queue #t) (let () (let f () (unless (= (length ($threads)) nthreads) (with-mutex (bounded-queue-mutex job-queue) (condition-signal (bounded-queue-room job-queue)) (condition-signal (bounded-queue-ready job-queue))) ($yield) (f)))) (eqv? ans 9227465)) ($thread-check) (let ([ans #f]) (define fattercode '(module (fatterfib fatter slimmer zero) (define zero? null?) (define 1+ list) (define 1- car) (define zero '()) (define (fatter n) (if (= n 0) zero (1+ (fatter (- n 1))))) (define (slimmer x) (if (zero? x) 0 (+ (slimmer (1- x)) 1))) (define fatter+ (lambda (x y) (if (zero? y) x (fatter+ (1+ x) (1- y))))) (define fatterfib (lambda (x) (if (or (zero? x) (zero? (1- x))) (1+ zero) (fatter+ (fatterfib (1- x)) (fatterfib (1- (1- x))))))))) (define fatter-eval (lambda (eval loc) (lambda () (let f ([n 10] [a 0]) (if (= n 0) (set-car! loc a) (f (- n 1) (+ a (eval `(let () ,fattercode (slimmer (fatterfib (fatter 10)))))))))))) (define (thread-eval-test n) (define locs (map list (make-list n 0))) (for-each ; compiler is not reentrant wrt threads, so must use interpret (lambda (loc) (fork-thread (fatter-eval interpret loc))) locs) (let f () (when (ormap zero? (map car locs)) ($yield) (f))) (map car locs)) (equal? (thread-eval-test 7) '(890 890 890 890 890 890 890))) ($thread-check) (equal? (let ([x1 #f] [x2 #f] [x3 #f] [x4 #f] [x5 #f]) (define p (make-thread-parameter 0)) (set! x1 (p)) (p 3) (set! x2 (p)) (fork-thread (lambda () (set! x3 (p)) (p 5) (set! x4 (p)))) (let f () (unless x4 ($yield) (f))) (set! x5 (p)) (list x1 x2 x3 x4 x5)) '(0 3 3 5 3)) ($thread-check) (equal? (let ([x1 #f] [x2 #f] [x3 #f] [x4 #f] [x5 #f]) (define p (make-thread-parameter 0)) (define m (make-mutex)) (define c (make-condition)) (set! x1 (p)) (p 3) (set! x2 (p)) (with-mutex m (fork-thread (lambda () (set! x3 (p)) (p 5) (set! x4 (p)) (with-mutex m (condition-signal c)))) (condition-wait c m)) (set! x5 (p)) (list x1 x2 x3 x4 x5)) '(0 3 3 5 3)) ($thread-check) (let () (define done? #f) (define m (make-mutex)) (define spin (lambda () (unless done? (if (mutex-acquire m #f) (set! done? #t) (begin ($yield) (spin)))))) (fork-thread spin) (fork-thread spin) (spin) done?) ($thread-check) (eqv? (let () (define-record accumulator () ([acc 0] [increments 0] [(immutable mutex) (make-mutex)])) (define incr! (lambda (a n) (with-mutex (accumulator-mutex a) (set-accumulator-acc! a (+ (accumulator-acc a) n)) (set-accumulator-increments! a (+ (accumulator-increments a) 1))))) (define fact (lambda (x) (if (= x 0) 1 (* x (fact (- x 1)))))) (define foo (lambda (x) (string-length (number->string (fact x))))) (let () (define acc (make-accumulator)) (define bar (lambda (x) (fork-thread (lambda () (incr! acc (foo x)))))) (bar 1100) (bar 1200) (bar 1300) (bar 1400) (bar 1500) (bar 1400) (bar 1300) (bar 1200) (bar 1100) (let f () (unless (= (accumulator-increments acc) 9) ($yield) (f))) (accumulator-acc acc))) 30777) ($thread-check) (let () (define fat+ (lambda (x y) (if (zero? y) x (fat+ (1+ x) (1- y))))) (define fatfib (lambda (x) (if (< x 2) 1 (fat+ (fatfib (1- x)) (fatfib (1- (1- x))))))) (set! ans (fatfib 30)) (collect) (eqv? ans 1346269)) ($thread-check) (eq? (let () (define bt (foreign-procedure "(cs)backdoor_thread" (scheme-object) iptr)) (let f ([n 100]) (let ([q 0]) (unless (= n 0) (let ([p (lambda () (let ([count 10]) (define chew (lambda (x) (when (> count 0) (chew (list 0))))) (parameterize ([collect-request-handler (lambda () (set! count (- count 1)) (collect))]) (chew 0))) (set! q (+ q 7)))]) (lock-object p) (bt p) (let f () (when (= q 0) ($yield) (f))) (let f () (unless (= (length ($threads)) 1) ($yield) (f))) (unlock-object p)) (unless (= q 14) (errorf #f "~s isn't 14" q)) (f (- n 1))))) 'cool) 'cool) ($thread-check) (let () (define m (make-mutex)) (define c (make-condition)) (define nthreads 4) (define saved-condition #f) (with-mutex m (let f ([n nthreads]) (unless (= n 0) (fork-thread (lambda () (guard (c [else (set! saved-condition c)]) (let g ([k 1000]) (unless (= k 0) (let ([op (open-file-output-port (format "testfile~s.ss" n) (file-options replace))]) (port-file-compressed! op) (put-u8 op 104) (close-port op)) (g (- k 1))))) (with-mutex m (set! nthreads (- nthreads 1)) (when (= nthreads 0) (condition-signal c))))) (f (- n 1)))) (condition-wait c m)) (when saved-condition (raise saved-condition)) #t) ($thread-check) (let () (define m (make-mutex)) (define c (make-condition)) (define nthreads 4) (define saved-condition #f) (with-mutex m (let f ([n nthreads]) (unless (= n 0) (fork-thread (lambda () (guard (c [else (set! saved-condition c)]) (let g ([k 1000]) (unless (= k 0) (let ([ip (open-file-input-port (format "testfile~s.ss" n))]) (port-file-compressed! ip) (let ([b (get-u8 ip)]) (unless (eqv? b 104) (error #f "thread ~s read wrong value ~s" n b))) (close-port ip)) (g (- k 1))))) (with-mutex m (set! nthreads (- nthreads 1)) (when (= nthreads 0) (condition-signal c))))) (f (- n 1)))) (condition-wait c m)) (when saved-condition (raise saved-condition)) #t) ($thread-check) (begin (module ($tt-count $tt-done!) (define n 0) (define done-mutex (make-mutex)) (define $tt-count (lambda () n)) (define ($tt-done!) (with-mutex done-mutex (set! n (+ n 1))))) (define $tt-spam (make-thread-parameter 'main)) (define $tt-run (make-thread-parameter void)) (define $tt-global-mutex (make-mutex)) (define $tt-fat (make-thread-parameter (let f ([n 100] [ls (oblist)]) (define (pick-rem ls) (let f ([ls ls] [i (random (length ls))]) (if (fx= i 0) (values (car ls) (cdr ls)) (let-values ([(x d) (f (cdr ls) (fx- i 1))]) (values x (cons (car ls) d)))))) (if (= n 0) '() (let-values ([(x ls) (pick-rem ls)]) (cons x (f (- n 1) ls))))))) ; original test used copy-environment, which we don't use because ; environments are never collected and we don't want the remaining ; mats to take forever to run because the heap is full of lots of ; unreclaimable junk. So we simulate some of the chores of copying ; an environment (define ($tt-chew ls) (let ([g* (map (lambda (x) (let ([g (gensym)]) (putprop g '*cte* (cons 'alias x)) (when (#%$top-level-bound? x) (#%$set-top-level-value! g (#%$top-level-value x))) g)) ls)]) (map (lambda (g) (cdr (getprop g '*cte*))) g*))) (with-output-to-file "testfile.ss" (lambda () (pretty-print '($tt-run (lambda () (do ([i 2 (1- i)]) ((= i 0)) (with-mutex $tt-global-mutex (printf "thread ~s iteration ~s\n" ($tt-spam) i)) (with-mutex $tt-global-mutex ($tt-fat ($tt-chew ($tt-fat))))))))) 'replace) (parameterize ([collect-request-handler (lambda () (printf "~s collecting\n" ($tt-spam)) (collect))] [collect-notify #t]) (time (do ([i 15 (1- i)]) ((= i 0)) (printf "Pass ~a~%" i) (time (let* ([thread-count 25] #;[compile-mutex (make-mutex)] [thread-finished-mutex (make-mutex)] [thread-finished-count 0] [program-finished-condition (make-condition)] [console-mutex $tt-global-mutex] [saved-condition #f]) (do ([i thread-count (1- i)]) ((= i 0)) (mutex-acquire $tt-global-mutex) (parameterize ([$tt-fat ($tt-chew ($tt-fat))]) (mutex-release $tt-global-mutex) (fork-thread (lambda () (guard (c [else (set! saved-condition c)]) ($tt-spam i) (with-mutex console-mutex (printf "Starting thread ~s...~%" i)) ; mutex should no longer be needed as of v7.3 #;(with-mutex compile-mutex (load "testfile.ss")) ; let's see if we can load from source w/o a mutex (load "testfile.ss") (($tt-run)) ; now let's see if we can compile and load object code ; w/o a mutex (compile-file "testfile.ss" (format "testfile.~s" i)) (load (format "testfile.~s" i)) (($tt-run)) (with-mutex console-mutex (printf "Finished ~s.~%" i))) ($tt-done!) (with-mutex thread-finished-mutex (set! thread-finished-count (1+ thread-finished-count)) (when (= thread-finished-count thread-count) (condition-signal program-finished-condition))))))) (with-mutex thread-finished-mutex (unless (= thread-finished-count thread-count) (condition-wait program-finished-condition thread-finished-mutex))) ($tt-done!) (when saved-condition (raise saved-condition))))))) (eqv? ($tt-count) 390)) ($thread-check) ; following tests garbage collection of code created and run by multiple ; threads. gc is done by one thread. will the others find their instruction ; and data caches properly synchronized? (let ([thread-count 2] [iterations 10000]) (equal? (parameterize ([collect-trip-bytes (expt 2 15)] [collect-generation-radix 1]) (let ([out '()] [out-mutex (make-mutex)] [out-condition (make-condition)] [error? #f]) (do ([i 0 (+ i 1)]) ((= i thread-count)) (fork-thread (lambda () (guard (c [#t (with-mutex out-mutex (set! error? c))]) (let ([n (eval `(let () (define zero? null?) (define 1+ list) (define 1- car) (define zero '()) (define (fatter n) (if (= n 0) zero (1+ (fatter (- n 1))))) (define (slimmer x) (if (zero? x) 0 (+ (slimmer (1- x)) 1))) (define fatter+ (lambda (x y) (if (zero? y) x (fatter+ (1+ x) (1- y))))) (define fatterfib (lambda (x) (if (or (zero? x) (zero? (1- x))) (1+ zero) (fatter+ (fatterfib (1- x)) (fatterfib (1- (1- x))))))) (let loop ([n ,iterations] [a 0]) (if (= n 0) a (loop (- n 1) (+ a (slimmer (fatterfib (fatter 10)))))))))]) (with-mutex out-mutex (set! out (cons n out)) (condition-signal out-condition))))))) (let f () (printf "waiting for ~s more thread(s)\n" (- thread-count (length out))) (with-mutex out-mutex (unless (or error? (= (length out) thread-count) (= (length ($threads)) $nthreads)) (condition-wait out-condition out-mutex))) (cond [error? (raise error?)] [(or (= (length out) thread-count) (= (length ($threads)) $nthreads)) (printf "done\n") out] [else (f)])))) (make-list thread-count (* 89 iterations)))) ($thread-check) ; following tries to verify proper synchronization when top-level exports ; build up system property list. need a module with at least 21 exports to ; force syntax.ss to consult property list for imports (let ([thread-count 20] [iterations 100] [syms '(a1 a2 a3 a4 a5 a6 a7 a8 a9 a10 a11 a12 a13 a14 a15 a16 a17 a18 a19 a20 a21 a22 a23 a24)]) (equal? (let ([out '()] [out-mutex (make-mutex)] [error? #f]) (do ([i 0 (+ i 1)]) ((= i thread-count)) (fork-thread (lambda () (guard (c [#t (set! error? c)]) (do ([j 0 (+ j 1)]) ((= j iterations)) (eval `(letrec-syntax ([frob (syntax-rules () [(_ m r) (frobh m r ,syms ,syms ())])] [frobh (syntax-rules () [(_ m r (s ...) () (q ...)) (module m (r q ...) (define-syntax r (identifier-syntax (list ,(+ (* i thread-count) j) q ...))) (define q 's) ...)] [(_ m r (a ...) (p0 p ...) (q ...)) (frobh m r (a ...) (p ...) ($tx q ...))])]) (frob ,(string->symbol (format "m~s-~s" i j)) $tx))) (let ([v (eval `(let () (import ,(string->symbol (format "m~s-~s" i j))) $tx))]) (with-mutex out-mutex (set! out (cons v out))))))))) (let f ([n 0]) (cond [error? (raise error?)] [(or (= (length out) (* thread-count iterations)) (= (length ($threads)) $nthreads)) (sort (lambda (x y) (< (car x) (car y))) out)] [else (when (= (modulo n 100000) 0) (printf "waiting ~s ~s\n" n (length out))) (f (+ n 1))]))) (sort (lambda (x y) (< (car x) (car y))) (let f ([i 0]) (if (= i thread-count) '() (let g ([j 0]) (if (= j iterations) (f (+ i 1)) (cons (cons (+ (* i thread-count) j) syms) (g (+ j 1)))))))))) ($thread-check) (let ([thread-count 20] [iterations 100]) (equal? (let ([out '()] [out-mutex (make-mutex)] [error? #f]) (do ([i 0 (+ i 1)]) ((= i thread-count)) (fork-thread (lambda () (guard (c [#t (set! error? c)]) (do ([j 0 (+ j 1)]) ((= j iterations)) (eval `(letrec-syntax ([frob (syntax-rules () [(_ a) (begin (define-syntax $ty (identifier-syntax ,(+ (* i thread-count) j))) (define-syntax a (identifier-syntax $ty)))])]) (frob ,(string->symbol (format "a~s-~s" i j))))) (let ([v (eval (string->symbol (format "a~s-~s" i j)))]) (with-mutex out-mutex (set! out (cons v out))))))))) (let f ([n 0]) (cond [error? (raise error?)] [(or (= (length out) (* thread-count iterations)) (= (length ($threads)) $nthreads)) (sort < out)] [else (when (= (modulo n 100000) 0) (printf "waiting ~s ~s\n" n (length out))) (f (+ n 1))]))) (sort < (let f ([i 0]) (if (= i thread-count) '() (let g ([j 0]) (if (= j iterations) (f (+ i 1)) (cons (+ (* i thread-count) j) (g (+ j 1)))))))))) ($thread-check) ; this mat has some inherent starvation issues, with the main thread ; looping rather than waiting on a condition at initialization time and ; other threads looping rather than waiting on a condition when looking ; for work to steal. these looping threads can hog the cpu without ; doing anything useful, causing progress to stall or halt. this ; manifests as occasional indefinite delays under Windows and OpenBSD, ; and it has the potential to cause the same on other operating systems. ; it's not clear how to fix the mat without changing it fundamentally. #;(eqv? (let () ; from Ryan Newton (define-syntax ASSERT (lambda (x) (syntax-case x () [(_ expr) #'(or expr (errorf 'ASSERT "failed: ~s" (IFCHEZ #'expr (format-syntax #'expr))))] ;; This form is (ASSERT integer? x) returning the value of x. [(_ fun val) #'(let ([v val]) (if (fun v) v (errorf 'ASSERT "failed: ~s\n Value which did not satisfy above predicate: ~s" (IFCHEZ #'fun (format-syntax #'fun)) v)))] ))) (define test-depth 25) ;; Make a tree with 2^test-depth nodes. (define vector-build (lambda (n f) (let ([v (make-vector n)]) (do ([i 0 (fx+ i 1)]) ((= i n) v) (vector-set! v i (f i)) )))) ;; STATE: ;; Each thread's stack has a list of frames, from newest to oldest. ;; We use a lock-free approach for mutating/reading the frame list. ;; Therefore, a thief might steal an old inactive frame, but this poses no problem. ;; ;; A thread's "stack" must be as efficient as possible, because ;; it essentially replaces the native scheme stack where par calls ;; are concerned. (I wonder if continuations can serve any purpose here.) ;; Note, head is the "bottom" and tail is the "top". We add to tail. (define-record shadowstack (id head tail frames)) ;; Frames are locked individually. ;; status may be 'available, 'stolen, or 'done (define-record shadowframe (mut status oper argval)) ;; There's also a global list of threads: (define allstacks '#()) ;; This is effectively immutable. (define par-finished #f) ;; And a mutex for global state: (define global-mut (make-mutex)) (define threads-registered 1) ;; A new stack has no frames, but has a (hopefully) unique ID: (define (new-stack) (make-shadowstack (random 10000) 0 ;; Head pointer. 0 ;; Tail pointer. (vector-build 50 (lambda (_) (make-shadowframe (make-mutex) #f #f #f))))) ;; A per-thread parameter. (define this-stack (make-thread-parameter (new-stack))) ;; Mutated below: (define numprocessors #f) ;; We spin until everybody is awake. (define (wait-for-everybody) (let wait-for-threads ([n 0]) ; attempt to prevent apparent occasional starvation on openbsd (when (= n 0) (printf " ~s ~s\n" threads-registered numprocessors)) (unless (= threads-registered numprocessors) (wait-for-threads (mod n 1000))))) ;; DEBUGGING: ;; Pick a print: ; (define (print . args) (with-mutex global-mut (apply printf args) (flush-output-port))) ; (define (print . args) (apply printf args)) (define (print . args) (void)) ;; fizzle ;; ---------------------------------------- (define (init-par num-cpus) (printf "\n Initializing PAR system for ~s threads.\n" num-cpus) (with-mutex global-mut (ASSERT (eq? threads-registered 1)) (set! numprocessors num-cpus) (set! allstacks (make-vector num-cpus)) (vector-set! allstacks 0 (this-stack)) ;; We fork N-1 threads (the original one counts) (do ([i 1 (fx+ i 1)]) ([= i num-cpus] (void)) (vector-set! allstacks i (make-worker)))) (wait-for-everybody) (printf "Everyone's awake!\n")) (define (shutdown-par) (set! par-finished #t)) (define (par-status) (printf "Par status:\n par-finished ~s\n allstacks: ~s\n stacksizes: ~s\n\n" par-finished (vector-length allstacks) (map shadowstack-tail (vector->list allstacks)))) ;; ---------------------------------------- ;; Try to do work and mark it as done. (define (steal-work! frame) (and (eq? 'available (shadowframe-status frame)) (mutex-acquire (shadowframe-mut frame) #f) ;; Don't block on it ;; From here on out we've got the mutex: (if (eq? 'available (shadowframe-status frame)) ;; If someone beat us here, we fizzle #t (begin (mutex-release (shadowframe-mut frame)) #f)) (begin ;;(printf "STOLE work! ~s\n" frame) (set-shadowframe-status! frame 'stolen) (mutex-release (shadowframe-mut frame)) ;; Then let go to do the real work: (set-shadowframe-argval! frame ((shadowframe-oper frame) (shadowframe-argval frame))) (set-shadowframe-status! frame 'done) #t))) (define (find-and-steal-once!) (let* ([ind (random numprocessors)] [stack (vector-ref allstacks ind)]) (let* ([frames (shadowstack-frames stack)] [tl (shadowstack-tail stack)]) (let frmloop ([i 0]) (if (fx= i tl) #f ;; No work on this processor, try again. (if (steal-work! (vector-ref frames i)) #t (frmloop (fx+ 1 i)))))))) (define (make-worker) (define stack (new-stack)) (fork-thread (lambda () (this-stack stack) ;; Initialize stack. (with-mutex global-mut ;; Register our existence. (set! threads-registered (add1 threads-registered))) ;; Steal work forever: (let forever () (unless par-finished (find-and-steal-once!) (forever))))) stack) (define-syntax pcall (syntax-rules () [(_ op (f x) e2) (let ([stack (this-stack)]) (define (push! oper val) (let ([frame (vector-ref (shadowstack-frames stack) (shadowstack-tail stack))]) ;; Initialize the frame (set-shadowframe-oper! frame oper) (set-shadowframe-argval! frame val) (set-shadowframe-status! frame 'available) (set-shadowstack-tail! stack (fx+ (shadowstack-tail stack) 1)) frame)) (define (pop!) (set-shadowstack-tail! stack (fx- (shadowstack-tail stack) 1))) (let ([frame (push! f x)]) (let ([val1 e2]) ;; We're the parent, when we get to this frame, we lock it off from all other comers. ;; Thieves should do non-blocking probes. (let waitloop () (mutex-acquire (shadowframe-mut frame)) (case (shadowframe-status frame) [(available) (set-shadowframe-status! frame 'stolen) ;; Just in case... (pop!) ;; Pop before we even start the thunk. (mutex-release (shadowframe-mut frame)) (op ((shadowframe-oper frame) (shadowframe-argval frame)) val1)] ;; Oops, they may be waiting to get back in here and set the result, let's get out quick. [(stolen) ;; Let go of this so they can finish and mark it as done. (mutex-release (shadowframe-mut frame)) (find-and-steal-once!) ;; Meanwhile we should go try to make ourselves useful.. (waitloop)] ;; When we're done with that come back and see if our outsourced job is done. ;; It was stolen and is now completed: [else (pop!) (mutex-release (shadowframe-mut frame)) (op (shadowframe-argval frame) val1)])) )))])) ;; Returns values in a list (define-syntax par (syntax-rules () [(_ a b) (pcall list ((lambda (_) a) #f) b)])) (define-syntax parmv (syntax-rules () [(_ a b) (pcall values ((lambda (_) a) #f) b)])) ;;================================================================================ (init-par 4) (let () (define (tree n) (if (fxzero? n) 1 (pcall fx+ (tree (fx- n 1)) (tree (fx- n 1))))) (let ([n (time (tree test-depth))]) (shutdown-par) n))) 33554432) ($thread-check) ; make sure thread can return zero values (eqv? (let ([x #f]) (fork-thread (lambda () (call/cc (lambda (k) (set! x 'frob) (k) (set! x 'borf))))) (let f () (unless x ($yield) (f))) x) 'frob) ($thread-check) (equal? (let () (define n 10) (define m (make-mutex)) (define c (make-condition)) (define-ftype A (struct (x double) (y iptr))) (define a (make-ftype-pointer A (foreign-alloc (ftype-sizeof A)))) (define ls '()) (ftype-set! A (x) a 3.4) (ftype-set! A (y) a 1) (with-mutex m (do ([n n (fx- n 1)]) ((fx= n 0)) (fork-thread (lambda () (define fat+ (lambda (x y) (if (zero? y) x (fat+ (1+ x) (1- y))))) (define fatfib (lambda (x) (if (< x 2) 1 (fat+ (fatfib (1- x)) (fatfib (1- (1- x))))))) (do ([i (fx+ 1000 n) (fx- i 1)] [q (fatfib 4) (fatfib 4)]) ((fx= i 0) (with-mutex m (set! ls (cons q ls)) (condition-signal c))) (if (odd? n) (ftype-locked-incr! A (y) a) (when (ftype-locked-decr! A (y) a) (printf "woohoo!\n"))))))) (let loop () (if (equal? (length ls) n) (list (ftype-ref A (x) a) (ftype-ref A (y) a)) (begin (condition-wait c m) (loop)))))) '(3.4 -4)) ($thread-check) (begin (load-shared-object (format "~a/foreign1.so" *mats-dir*)) #t) (equal? (let () (define ff-locked-incr! (foreign-procedure "locked_incr" ((* iptr)) boolean)) (define ff-locked-decr! (foreign-procedure "locked_decr" ((* iptr)) boolean)) (define n 10) (define m (make-mutex)) (define c (make-condition)) (define-ftype A (struct (x double) (y iptr))) (define a (make-ftype-pointer A (foreign-alloc (ftype-sizeof A)))) (define ls '()) (ftype-set! A (x) a 3.4) (ftype-set! A (y) a 1) (with-mutex m (do ([n n (fx- n 1)]) ((fx= n 0)) (fork-thread (lambda () (define fat+ (lambda (x y) (if (zero? y) x (fat+ (1+ x) (1- y))))) (define fatfib (lambda (x) (if (< x 2) 1 (fat+ (fatfib (1- x)) (fatfib (1- (1- x))))))) (do ([i (fx+ 1000 n) (fx- i 1)] [q (fatfib 10) (fatfib 10)]) ((fx= i 0) (with-mutex m (set! ls (cons q ls)) (condition-signal c))) (if (odd? n) (ff-locked-incr! (ftype-&ref A (y) a)) (when (ff-locked-decr! (ftype-&ref A (y) a)) (printf "hoowoo!\n"))))))) (let loop () (if (equal? (length ls) n) (list (ftype-ref A (x) a) (ftype-ref A (y) a)) (begin (condition-wait c m) (loop)))))) '(3.4 -4)) ($thread-check) (eqv? (let () (define n 10) (define m (make-mutex)) (define c (make-condition)) (define-ftype A (struct (x double) (y iptr))) (define a (make-ftype-pointer A (foreign-alloc (ftype-sizeof A)))) (define ls '()) (ftype-set! A (x) a 3.5) (ftype-init-lock! A (y) a) (with-mutex m (do ([n n (fx- n 1)]) ((fx= n 0)) (fork-thread (lambda () (define fat+ (lambda (x y) (if (zero? y) x (fat+ (1+ x) (1- y))))) (define fatfib (lambda (x) (if (< x 2) 1 (fat+ (fatfib (1- x)) (fatfib (1- (1- x))))))) (do ([i (+ 1000 n) (fx- i 1)] [q (fatfib 4) (fatfib 4)]) ((fx= i 0) (with-mutex m (set! ls (cons q ls)) (condition-signal c))) ; disable collections so we don't rendezvous for collection while holding the lock, ; leaving some other thread spinning hopelessly in ftype-spin-lock! (with-interrupts-disabled (if (odd? i) (let loop () (unless (ftype-lock! A (y) a) (printf "waiting\n") (loop))) (ftype-spin-lock! A (y) a)) (ftype-set! A (x) a ((if (odd? n) + -) (ftype-ref A (x) a) 1.0)) (ftype-unlock! A (y) a)))))) (let loop () (if (equal? (length ls) n) (ftype-ref A (x) a) (begin (condition-wait c m) (loop)))))) -1.5) ($thread-check) (parameterize ([collect-request-handler void]) (define (expect-error what thunk) (guard (c [(and (message-condition? c) (equal? (condition-message c) (format "~a is defunct" what)))]) (thunk) (error #f "error expected"))) (let ([g (make-guardian)]) (g (make-mutex)) (collect) (let ([m (g)]) (expect-error 'mutex (lambda () (mutex-acquire m))) (expect-error 'mutex (lambda () (mutex-release m))) (expect-error 'mutex (lambda () (condition-wait (make-condition) m)))) (g (make-condition)) (collect) (let ([c (g)]) (expect-error 'condition (lambda () (condition-wait c (make-mutex)))) (expect-error 'condition (lambda () (condition-broadcast c))) (expect-error 'condition (lambda () (condition-signal c)))))) ) (mat make-thread-parameter (begin (define p (make-thread-parameter #f not)) #t) (p) (begin (p #f) (p)) (begin (p #t) (not (p))) (begin (define q (make-thread-parameter #t)) #t) (q) (begin (q #f) (not (q))) (begin (q #t) (q)) (or (= (optimize-level) 3) (guard (c [(and (message-condition? c) (equal? (condition-message c) "~s is not a procedure") (irritants-condition? c) (equal? (condition-irritants c) (list 2)))]) (make-thread-parameter 1 2))) (begin (define p (make-thread-parameter 5 (lambda (x) (+ x 1)))) #t) (eqv? (p) 6) (or (= (optimize-level) 3) (guard (c [(and (message-condition? c) (equal? (condition-message c) "~s is not a number") (irritants-condition? c) (equal? (condition-irritants c) (list 'a)))]) (p 'a))) ) (mat cas (begin (define (check container container-ref container-cas!) (let ([N 1000] [M 4] [done 0] [m (make-mutex)] [c (make-condition)]) (define (bump) (let loop ([i 0]) (unless (= i N) (let ([v (container-ref container)]) (if (container-cas! container v (add1 v)) (loop (add1 i)) (loop i))))) (mutex-acquire m) (set! done (add1 done)) (condition-signal c) (mutex-release m)) (let loop ([j 0]) (when (< j M) (fork-thread bump) (loop (add1 j)))) (mutex-acquire m) (let loop () (cond [(= done M) (mutex-release m)] [else (condition-wait c m) (loop)])) (= (container-ref container) (* M N)))) (check (box 0) unbox box-cas!)) (check (vector 1 0 2) (lambda (v) (vector-ref v 1)) (lambda (v o n) (vector-cas! v 1 o n)))) ; give the bump threads 0.2 s to finish (sleep (make-time 'time-duration 200000000 0)) )