Janet 1.15.0-2795e8a Documentation
The Event Loop
In addition to threads, Janet comes with another powerful concurrency model out of the box - the event loop. The event loop provides concurrency within a single thread by allowing cooperating fibers to yield instead of blocking forward progress. This is a form of cooperative multi-threading that can be useful in many applications, like situations where there are many concurrent, IO-bound tasks. This means tasks that spend most of their execution time waiting for network or disk, rather than using CPU time.
Most event loop functionality in Janet is exposed through the
ev/ module, but many other functions outside this module
will interact with the event loop.
An Overview of the Event Loop
Janet is by no means the first language to provide an event loop, and most languages or tools that provide them work in the same general way.
All programs in Janet are wrapped in an implicit loop that will run until all tasks are complete. Following is pseudo-code describing how the event loop works - the event loop code is all actually implemented directly in the runtime for efficiency.
# Functions that yield to the event loop will put (fiber/root-fiber) here. (def pending-tasks @) # Pseudo-code of the event loop scheduler (while (not (empty? pending-tasks)) (def [root-fiber data] (wait-for-next-event)) (resume root-fiber data))
root-fiber, or task, is a fiber that will be automatically resumed
when an event (or sequence of events) that it is waiting for occurs. Generally, one should not manually
resume tasks - the event loop will call
resume when the completion event occurs.
To be precise, a task is just any fiber that was scheduled to run by the event loop.
Therefor, all tasks are fibers, but not all fibers are tasks. You can get the currently executing task in Janet with
Blocking the Event Loop
When using the event loop, it is important to be aware that CPU bound loops will block all other tasks on the
event loop. For example, an infinite while loop that does not yield to the event will bring the entire thread to a halt!
This is generally considered to be one of the biggest drawbacks to cooperative multi-threading, so be cautious and do not mix in
blocking functions into a program that uses the event loop. Many functions in Janet's core library will not block when the event loop is enabled, but
a few will. All functions in the
file/ module will block, so be careful when using these with networked files or
thread/send will block, so should not be used in applications that use the event loop. These
functions may be changed or deprecated in the future. Other blocking functions include
NOTE: This means that getting input from the repl will block the event loop! Be careful of this when trying to use ev functions
from the built in repl. Instead, you can use a stream-based repl such as
spork/netrepl which will interact well with the event loop.
A default Janet program has a single task that will run until complete. To create new tasks, Janet provides two
built-in functions -
ev/call can be implement in terms of
ev/go, and other
macros and functions are wrappers around these two functions.
(defn worker "Does some work." [name n] (for i 0 n (print name " working " i "...") (ev/sleep 0.5)) (print name " is done!")) # Start bob working in a new task with ev/call (ev/call worker "bob" 10) (ev/sleep 0.25) # Start sally working in a new task with ev/go (ev/go (fiber/new |(worker "sally" 20))) (ev/sleep 11) (print "Everyone should be done by now!")
This example will start two worker tasks that will run concurrently with each other. The
ev/call function takes a function as well as it's arguments
and calls that function inside a new task.
ev/call returns the newly created task fiber immediately.
The more general way to start new tasks is
ev/go, which takes as an argument a fiber to be resumed by the event loop scheduler.
ev/go allows the programmer to customize fiber flags as well as specify a supervisor channel for the newly created task.
Lastly, there is the
ev/spawn macro for concisely running a series of forms in a new task.
To communicate between tasks, the
ev/ module offers two abstractions: streams and channels. Both abstraction works as FIFO (First In, First Out) data structures, but operate
on different kinds of data. Channels allow the programmer to communicate by sending any Janet value as messages, and only work inside a thread - they do not allow communication
between threads, processes, or over the network. Streams are wrappers around file descriptors and operate on streams of bytes. Streams can communicate across threads, processes, and across
With these constraints, channels are preferred for things like internal queues and between-task communication, where streams are useful for most everything else.
Channels are based around a queue abstraction, where producer tasks will add items to the channel with
ev/give, and consumer tasks will remove items from the
ev/take. If there are no items in the channel when
ev/take is called, the consumer will suspend until a producer gives an item
to the channel. Similarly, if a task gives to a full channel, it will suspend until a consumer takes an item from the queue, freeing up space for the producer
to put an item.
Internally, channels are infinitely expanding queues, so one might think that they never need to be "full", but allowing channels to fill up and suspend writers allows for back pressure. Back pressure means that a task can indicate to other tasks that it is "backed up", and that they should stop sending it "work".
# Create a new channel with a limit of 10 items. (def channel (ev/chan 10)) # Write to the channel forever (ev/spawn (forever (ev/give channel (math/random)))) (defn consumer "Take from the channel forever." [name delay] (forever (def item (ev/take channel)) (print name " got item " item) (ev/sleep delay))) (ev/call consumer "bob" 1) (ev/call consumer "sally" 0.63)
Streams let Janet do IO without blocking. They provide a very similar interface to files, with a few extra caveats and capabilities. There are several ways to acquire streams, such
as using the
To read from a stream, use
ev/chunk. These can also be accessed with
:chunk methods on streams.
To write to a stream, use the function
ev/write. This can be referenced as the
:write method on streams as well.
For some examples on how to use streams, see documentation on networking in Janet.
Sometimes, IO operations can take to long or even hang indefinitely. Janet offers
ev/cancel to interrupt and cancel an ongoing IO operation. This will cause the canceled
task to be resumed but immediately error. From the point of view of the canceled task, it will look as though the last function that yielded to event loop raised an error.
protect will continue to work just as if the cancellation error was any other error.
(def f (ev/spawn (print "starting long io...") (ev/sleep 10000) (print "finished long io!"))) # wait 2 seconds before canceling the long IO. (ev/sleep 2) (ev/cancel f "canceled")
An interesting problem with supporting multiple tasks is error handling - if a task raises and error (or any other uncaught signal), who should handle it? The programmer often wants to know when a task fails (or finishes), and then continue with some other code. Alternatively, it might be desirable to try again a number of times before really giving up.
Janet has the concept of supervisor channels to support this. When a new task is scheduled with
ev/go, one can optional specify a channel to set as the new tasks supervisor.
When the task yields to the event loop, or raises any kind of fiber signal, as message is automatically written to the supervisor channel. Another task can then continuously listen on that
channel to monitor other tasks, handling errors or restarting them. This is a powerful pattern that can be used to build robust applications that gracefully handle failure.
(def channel (ev/chan)) (var counter 0) (defn worker "A worker that is unreliable and sometimes fails."  (repeat 50 (print "working...") (ev/sleep 0.1) (when (> 0.05 (math/random)) (print "worker errored out!") (error "unexpected error")) (++ counter)) (print "worker finished normally!")) (defn start-worker "Start a worker with the given channel as its supervisor"  (print "starting new worker...") (ev/go (fiber/new worker :tp) nil channel)) # make sure there are always two workers until counter gets to 100 (start-worker) (forever (start-worker) (def [status fiber] (ev/take channel)) (print "worker " fiber " completed with status :" status) (if (>= counter 100) (break)))
The fiber mask argument (
:tp above) determines what signals are automatically written to the supervisor channel. By default, the fiber will only write to the supervisor channel
when it completes successfully, with the message
[:ok fiber]. The most useful setting here is
:t, which cause the fiber to write an event to the supervisor channel when it terminates.
These events will be tuples of 2 arguments,
A task can also write custom messages to its supervisor with