This module works only with one of --mm:arc / --mm:atomicArc / --mm:orc compilation flags.
This module implements multi-producer multi-consumer channels - a concurrency primitive with a high-level interface intended for communication and synchronization between threads. It allows sending and receiving typed, isolated data, enabling safe and efficient concurrency.
The Chan type represents a generic fixed-size channel object that internally manages the underlying resources and synchronization. It has to be initialized using the newChan proc. Sending and receiving operations are provided by the blocking send and recv procs, and non-blocking trySend and tryRecv procs. Send operations add messages to the channel, receiving operations remove them.
See also:
The following is a simple example of two different ways to use channels: blocking and non-blocking.
Example: cmd: --threads:on --gc:orc
import sync/channels import std/os # In this example a channel is declared at module scope. # Channels are generic, and they include support for passing objects between # threads. # Note that isolated data passed through channels is moved around. var chan = newChan[string]() block example_blocking: # This proc will be run in another thread. proc basicWorker() = chan.send("Hello World!") # Launch the worker. var worker: Thread[void] createThread(worker, basicWorker) # Block until the message arrives, then print it out. var dest = "" dest = chan.recv() assert dest == "Hello World!" # Wait for the thread to exit before moving on to the next example. worker.joinThread() block example_non_blocking: # This is another proc to run in a background thread. This proc takes a while # to send the message since it first sleeps for some time. proc slowWorker(delay: Natural) = # `delay` is a period in milliseconds sleep(delay) chan.send("Another message") # Launch the worker with a delay set to 2 seconds (2000 ms). var worker: Thread[Natural] createThread(worker, slowWorker, 2000) # This time, use a non-blocking approach with tryRecv. # Since the main thread is not blocked, it could be used to perform other # useful work while it waits for data to arrive on the channel. var messages: seq[string] while true: var msg = "" if chan.tryRecv(msg): messages.add msg # "Another message" break messages.add "Pretend I'm doing useful work..." # For this example, sleep in order not to flood the sequence with too many # "pretend" messages. sleep(400) # Wait for the second thread to exit before cleaning up the channel. worker.joinThread() # Thread exits right after receiving the message assert messages[^1] == "Another message" # At least one non-successful attempt to receive the message had to occur. assert messages.len >= 2
Procs
proc `=destroy`[T](c: Chan[T]) {....raises: [].}
- Source Edit
proc `=wasMoved`[T](x: var Chan[T])
- Source Edit
proc newChan[T](elements: Positive = 30): Chan[T]
-
An initialization procedure, necessary for acquiring resources and initializing internal state of the channel.
elements is the capacity of the channel and thus how many messages it can hold before it refuses to accept any further messages.
Source Edit proc recv[T](c: Chan[T]): T {.inline.}
- Receives a message from the channel. A version of recv that returns the message. Source Edit
proc recv[T](c: Chan[T]; dst: var T) {.inline.}
-
Receives a message from the channel c and fill dst with its value.
This blocks the receiving thread until a message was successfully received.
If the channel does not contain any messages this will block the thread until a message get sent to the channel.
Source Edit proc send[T](c: Chan[T]; src: sink Isolated[T]) {.inline.}
-
Sends the message src to the channel c. This blocks the sending thread until src was successfully sent.
The memory of src is moved, not copied.
If the channel is already full with messages this will block the thread until messages from the channel are removed.
Source Edit proc tryRecv[T](c: Chan[T]; dst: var T): bool {.inline.}
-
Tries to receive a message from the channel c and fill dst with its value.
Doesn't block waiting for messages in the channel to become available. Instead returns after an attempt to receive a message was made.
Warning: In high-concurrency situations, consider using an exponential backoff strategy to reduce contention and improve the success rate of operations.Returns false and does not change dist if no message was received.
Source Edit proc trySend[T](c: Chan[T]; src: sink Isolated[T]): bool {.inline.}
-
Tries to send the message src to the channel c.
The memory of src will be moved if possible. Doesn't block waiting for space in the channel to become available. Instead returns after an attempt to send a message was made.
Warning: In high-concurrency situations, consider using an exponential backoff strategy to reduce contention and improve the success rate of operations.Returns false if the message was not sent because the number of pending messages in the channel exceeded its capacity.
Source Edit proc tryTake[T](c: Chan[T]; src: var Isolated[T]): bool {.inline.}
-
Tries to send the message src to the channel c.
The memory of src is moved directly. Be careful not to reuse src afterwards. This proc is suitable when src cannot be copied.
Doesn't block waiting for space in the channel to become available. Instead returns after an attempt to send a message was made.
Warning: In high-concurrency situations, consider using an exponential backoff strategy to reduce contention and improve the success rate of operations.Returns false if the message was not sent because the number of pending messages in the channel exceeded its capacity.
Source Edit