Janet 1.8.1-5b6b9f1 Documentation
(Other Versions:
1.7.0
1.6.0
1.5.1
1.5.0
1.4.0
1.3.1
)
Multithreading
Starting in version 1.6.0, Janet has the ability to do true, non-cooperative multithreading with the thread/
functions. Janet threads correspond to native threads on the host operating system, which may be either pthreads on
posix systems or Windows threads. Each thread has its own Janet heap, which means threads behave more like processes
that communicate by message passing. However, this does not prevent native code from sharing memory across these threads.
Without native extensions, however, the only way for two Janet threads to communicate directly is through message passing.
Creating Threads
Use `(thread/new func &opt capacity)` to create a new thread. This thread will start and wait for a message containing a function
that it will run as the main body. This function must also be able to take 1 parameter, the parent thread.
Each thread has its own mailbox, which can asynchronously receive messages. These messages are added to a queue, which has
a configurable maximum capacity according to the second optional argument passed to thread/new
. If unspecified, the mailbox
will have an initial capacity of 10.
(defn worker
[parent]
(print "New thread started!"))
# New thread's mailbox has capacity for 32 messages.
(def thread (thread/new worker 32))
Sending and Receiving Messages
Since threads do not share memory a heap, the way threads must communicate is through message passing. Use thread/send
to send a message to a thread, and thread/receive
to get messages sent to the current thread.
(defn worker
[parent]
(print "waiting for message...")
(def msg (thread/receive))
(print "got message: " msg))
(def thread (thread/new worker))
# Thread objects support the :send method as an alias for thread/send.
(:send thread "Hello!")
Limitations of Messages
Since threads do not share Janet heaps, all values sent as messages are first marshalled to a byte sequence by the sending thread, and then
unmarshalled by the receiving thread. For marshalling and unmarshalling, threads use the two tables make-image-dict
and
load-image-dict
. This means you can send over core bindings, even if the underlying value cannot be marshalled.
Semantically, messages sent with (thread/send to msg)
are first converted to a buffer
via (marshal msg make-image-dict mailbox-buf)
,
and then unmarshalled by the receiving thread via (unmarshal mailbox-buf load-image-dict)
.
Values that cannot be marshalled, including thread values, cannot be sent as messages to other threads, or even as part of a message to another thread. For example, the following will not work because open file handles cannot be marshalled.
(def file (file/open "myfile.txt" :w))
(defn worker
[parent]
(with-dyns [:out file]
(print "Writing to file.")))
# Will throw an error, as worker contains a reference to file, which cannot be marshalled.
(thread/new worker)
The fix here is to move the file creation inside the worker function, since every worker thread is going to have its own copy of the file handle.
(defn worker
[parent]
(def file (file/open "myfile.txt" :w))
(with-dyns [:out file]
(print "Writing to file.")))
# No error
(thread/new worker)
Blocking and Non-blocking Sends and Receives
All sends and receives can be blocking or non-blocking by providing an optional timeout value. A timeout
of 0 makes (thread/send to msg &opt timeout)
and (thread/receive &opt timeout)
non-blocking, while a positive timeout value
indicates and blocking send or receive, that will resume the current thread by throwing an error after
timeout seconds. Timeouts are limited to 30 days, with exception the using math/inf
for a timeout means that
a timeout will never occur.
(defn worker
[parent]
(for i 0 10
(os/sleep 0.5)
(:send parent :ping))
# sleep a long while to make the parent timeout
(os/sleep 2)
(:send parent :done))
# Will print the first 10 pings, but timeout and throw an error before :done
(try
(let [t (thread/new worker)]
(for i 0 11
(print "got message: " (thread/receive 1))))
([err] (print "error: " err)))
# Flush :done message
(thread/receive math/inf)
# Will not error and work successfully
(let [t (thread/new worker)]
(for i 0 11
(print "got message: " (thread/receive math/inf))))