Janet 1.16.1-87f8fe1 Documentation
(Other Versions: 1.15.0 1.13.1 1.12.2 1.11.1 1.10.1 1.9.1 1.8.1 1.7.0 1.6.0 1.5.1 1.5.0 1.4.0 1.3.1 )

The Event Loop

In addition to threads, Janet comes with another powerful concurrency model out of the box - the event loop. The event loop provides concurrency within a single thread by allowing cooperating fibers to yield instead of blocking forward progress. This is a form of cooperative multi-threading that can be useful in many applications, like situations where there are many concurrent, IO-bound tasks. This means tasks that spend most of their execution time waiting for network or disk, rather than using CPU time.

Most event loop functionality in Janet is exposed through the ev/ module, but many other functions outside this module will interact with the event loop.

An Overview of the Event Loop

Janet is by no means the first language to provide an event loop, and most languages or tools that provide them work in the same general way.

All programs in Janet are wrapped in an implicit loop that will run until all tasks are complete. Following is pseudo-code describing how the event loop works - the event loop code is all actually implemented directly in the runtime for efficiency.

# Functions that yield to the event loop will put (fiber/root-fiber) here.
(def pending-tasks @[])

# Pseudo-code of the event loop scheduler
(while (not (empty? pending-tasks))
 (def [root-fiber data] (wait-for-next-event))
 (resume root-fiber data))

Each root-fiber, or task, is a fiber that will be automatically resumed when an event (or sequence of events) that it is waiting for occurs. Generally, one should not manually resume tasks - the event loop will call resume when the completion event occurs.

To be precise, a task is just any fiber that was scheduled to run by the event loop. Therefor, all tasks are fibers, but not all fibers are tasks. You can get the currently executing task in Janet with (fiber/root-fiber).

Blocking the Event Loop

When using the event loop, it is important to be aware that CPU bound loops will block all other tasks on the event loop. For example, an infinite while loop that does not yield to the event will bring the entire thread to a halt! This is generally considered to be one of the biggest drawbacks to cooperative multi-threading, so be cautious and do not mix in blocking functions into a program that uses the event loop. Many functions in Janet's core library will not block when the event loop is enabled, but a few will. All functions in the file/ module will block, so be careful when using these with networked files or file/popen. Also, thread/receive and thread/send will block, so should not be used in applications that use the event loop. These functions may be changed or deprecated in the future. Other blocking functions include os/sleep and getline.

NOTE: This means that getting input from the repl will block the event loop! Be careful of this when trying to use ev functions from the built in repl. Instead, you can use a stream-based repl such as spork/netrepl which will interact well with the event loop.

Creating Tasks

A default Janet program has a single task that will run until complete. To create new tasks, Janet provides two built-in functions - ev/go and ev/call. ev/call can be implement in terms of ev/go, and other macros and functions are wrappers around these two functions.

(defn worker
 "Does some work."
 [name n]
 (for i 0 n
  (print name " working " i "...")
  (ev/sleep 0.5))
 (print name " is done!"))

# Start bob working in a new task with ev/call
(ev/call worker "bob" 10)

(ev/sleep 0.25)

# Start sally working in a new task with ev/go
(ev/go (fiber/new |(worker "sally" 20)))

(ev/sleep 11)
(print "Everyone should be done by now!")

This example will start two worker tasks that will run concurrently with each other. The ev/call function takes a function as well as it's arguments and calls that function inside a new task. ev/call returns the newly created task fiber immediately.

The more general way to start new tasks is ev/go, which takes as an argument a fiber to be resumed by the event loop scheduler. ev/go allows the programmer to customize fiber flags as well as specify a supervisor channel for the newly created task.

Lastly, there is the ev/spawn macro for concisely running a series of forms in a new task.

Task Communication

To communicate between tasks, the ev/ module offers two abstractions: streams and channels. Both abstraction works as FIFO (First In, First Out) data structures, but operate on different kinds of data. Channels allow the programmer to communicate by sending any Janet value as messages, and only work inside a thread - they do not allow communication between threads, processes, or over the network. Streams are wrappers around file descriptors and operate on streams of bytes. Streams can communicate across threads, processes, and across the network.

With these constraints, channels are preferred for things like internal queues and between-task communication, where streams are useful for most everything else.

Channels

Channels are based around a queue abstraction, where producer tasks will add items to the channel with ev/give, and consumer tasks will remove items from the channel with ev/take. If there are no items in the channel when ev/take is called, the consumer will suspend until a producer gives an item to the channel. Similarly, if a task gives to a full channel, it will suspend until a consumer takes an item from the queue, freeing up space for the producer to put an item.

Internally, channels are infinitely expanding queues, so one might think that they never need to be "full", but allowing channels to fill up and suspend writers allows for back pressure. Back pressure means that a task can indicate to other tasks that it is "backed up", and that they should stop sending it "work".

# Create a new channel with a limit of 10 items.
(def channel (ev/chan 10))

# Write to the channel forever
(ev/spawn
 (forever
  (ev/give channel (math/random))))

(defn consumer
 "Take from the channel forever."
 [name delay]
 (forever
  (def item (ev/take channel))
  (print name " got item " item)
  (ev/sleep delay)))

(ev/call consumer "bob" 1)
(ev/call consumer "sally" 0.63)

Streams

Streams let Janet do IO without blocking. They provide a very similar interface to files, with a few extra caveats and capabilities. There are several ways to acquire streams, such as using the net/ module, os/open, and os/pipe.

To read from a stream, use ev/read or ev/chunk. These can also be accessed with :read and :chunk methods on streams.

To write to a stream, use the function ev/write. This can be referenced as the :write method on streams as well.

For some examples on how to use streams, see documentation on networking in Janet.

Cancellation

Sometimes, IO operations can take to long or even hang indefinitely. Janet offers ev/cancel to interrupt and cancel an ongoing IO operation. This will cause the canceled task to be resumed but immediately error. From the point of view of the canceled task, it will look as though the last function that yielded to event loop raised an error.

Macros like try and protect will continue to work just as if the cancellation error was any other error.

(def f
 (ev/spawn
  (print "starting long io...")
  (ev/sleep 10000)
  (print "finished long io!")))

# wait 2 seconds before canceling the long IO.
(ev/sleep 2)
(ev/cancel f "canceled")

Supervisor Channels

An interesting problem with supporting multiple tasks is error handling - if a task raises and error (or any other uncaught signal), who should handle it? The programmer often wants to know when a task fails (or finishes), and then continue with some other code. Alternatively, it might be desirable to try again a number of times before really giving up.

Janet has the concept of supervisor channels to support this. When a new task is scheduled with ev/go, one can optional specify a channel to set as the new tasks supervisor. When the task yields to the event loop, or raises any kind of fiber signal, as message is automatically written to the supervisor channel. Another task can then continuously listen on that channel to monitor other tasks, handling errors or restarting them. This is a powerful pattern that can be used to build robust applications that gracefully handle failure.

(def channel (ev/chan))

(var counter 0)

(defn worker
 "A worker that is unreliable and sometimes fails."
 []
 (repeat 50
  (print "working...")
  (ev/sleep 0.1)
  (when (> 0.05 (math/random))
    (print "worker errored out!")
    (error "unexpected error"))
  (++ counter))
 (print "worker finished normally!"))

(defn start-worker
  "Start a worker with the given channel as its supervisor"
  []
  (print "starting new worker...")
  (ev/go (fiber/new worker :tp) nil channel))

# make sure there are always two workers until counter gets to 100
(start-worker)
(forever
  (start-worker)
  (def [status fiber] (ev/take channel))
  (print "worker " fiber " completed with status :" status)
  (if (>= counter 100) (break)))

The fiber mask argument (:tp above) determines what signals are automatically written to the supervisor channel. By default, the fiber will only write to the supervisor channel when it completes successfully, with the message [:ok fiber]. The most useful setting here is :t, which cause the fiber to write an event to the supervisor channel when it terminates. These events will be tuples of 2 arguments, (signal fiber).

A task can also write custom messages to its supervisor with ev/give-supervisor.