Channels — Communication between tasklets

A channel object is used for communication between tasklets.

By sending on a channel within a tasklet, another tasklet that is waiting to receive on the channel is resumed. If there is no waiting receiver, then the sender is suspended.

By receiving on a channel within a tasklet, another tasklet that is waiting on the channel to send is resumed. If there is no waiting sender, then the receiver is suspended.

Channels and threads

Channels are thread-safe. This means that tasklets running in one thread can use them to communicate with tasklets running in another. This is covered in the Threads and Stackless section.

The channel class

class channel
channel.send(value)

Send a value over the channel. If no other tasklet is already receiving on the channel, the sender will be blocked. Otherwise, the receiver will be activated immediately, and the sender is put at the end of the runnables list.

Example - sending a value over a channel:

>>> c = stackless.channel()
>>> def sender(chan, value):
...     chan.send(value)
...
>>> stackless.tasklet(sender)(5)
>>> stackless.run()
>>> print c.receive()
5
channel.receive()

Receive a value over the channel. If no other tasklet is already sending on the channel, the receiver will be blocked. Otherwise, the sender will be activated immediately, and the receiver is put at the end of the runnables list.

Example - receiving a value over a channel:

>>> c = stackless.channel()
>>> def receiver(chan):
...     value = chan.receive()
...     print value
...
>>> stackless.tasklet(receiver)(c)
>>> stackless.run()
>>> c.send(5)
5
channel.send_exception(exc, *args)

Send an exception over the channel. The behaviour is the same as for send(), however the receiving tasklet has the exception raised on it rather than receiving a value as it expects.

Example - sending an exception to a waiting receiver:

>>> c = stackless.channel()
>>> def receiver(chan):
...     chan.receive()
...
 >>> stackless.tasklet(receiver)(c)
 <stackless.tasklet object at 0x01BB8130>
 >>> stackless.run()
 >>> c.send_exception(Exception, "xxx")
 Traceback (most recent call last):
   File "<stdin>", line 1, in <module>
   File "<stdin>", line 2, in receiver
 Exception: xxx
channel.send_sequence(seq)

Send a stream of values over the channel. Combined with a generator, this is a very efficient way to build fast pipes.

Example - sending a sequence over a channel:

>>> def sender(channel):
...     channel.send_sequence(sequence)
...
>>> def receiver(channel):
...     count = 0
...     while count < len(sequence):
...             count += 1
...             value = channel.receive()
...             print value
...
>>> c = stackless.channel()
>>> stackless.tasklet(sender)(c)
<stackless.tasklet object at 0x01BB84F0>
>>> stackless.tasklet(receiver)(c)
<stackless.tasklet object at 0x01BB8170>
>>> sequence = range(4)
>>> sequence
[0, 1, 2, 3]
>>> stackless.run()
0
1
2
3
channel.__iter__()

Channels can work as an iterator. When they are used in this way, call overhead is removed on the receiving side, making it an efficient approach.

Example - iterating over a channel:

>>> def sender(channel):
...     for value in sequence:
...         channel.send(value)
...
>>> def receiver(channel):
...     for value in channel:
...         print value
...
>>> c = stackless.channel()
>>> stackless.tasklet(sender)(c)
<stackless.tasklet object at 0x01BB84F0>
>>> stackless.tasklet(receiver)(c)
<stackless.tasklet object at 0x01BB8170>
>>> sequence = range(4)
>>> sequence
[0, 1, 2, 3]
>>> stackless.run()
0
1
2
3
channel.next()
Part of the iteration protocol. Either returns the next value, or raises StopIteration.
channel.open()

Reopen a channel, see close().

Note

This functionality is rarely used in practice.

channel.close()

Prevents the channel queue from growing. If the channel is not empty, the flag closing becomes True. If the channel is empty, the flag closed becomes True.

Note

This functionality is rarely used in practice.

The following attributes can be used to select how the channel should behave with regard to performed channel actions and the scheduling of involved tasklets.

channel.preference

The preference attribute allows you to customise how the channel actions send or receive work with the scheduler.

There are three valid values you can assign it:

-1 Prefer the receiver (the default).
1 Prefer the sender.
0 Do not prefer anything.

It can be very important to your code behaving predictably, that it does one particular side of the channel action during the call. This might be that in a send action, the sending tasklet is blocked and rescheduled, while the waiting receiving tasklet is given the sent value and continues executing immediately. It might be that in a send action, the sending tasklet returns immediately to continue execution while the receiving tasklet is rescheduled. It might even be that both tasklets are scheduled.

The key to getting channel actions to work the way you want is to understand what “prefer” means. The tasklet that is preferred is simply the one whose execution resumes immediately, while the other tasklet is resumes execution when it next gets scheduled.

So if you do not want your send operations to block, you might set your preference attribute to 1. In this way, you could then send to all waiting receivers without blocking, as shown in the pumping the scheduler idiom described elsewhere in this documentation.

On the other hand, if you have a channel you are receiving a lot of data through, you might want to collect all the waiting data in the most efficient way - without blocking.

Example - receiving it all, without blocking:

channel.preference = -1

while channel.balance > 0:
    total += channel.receive()

In fact, by using the handy tasklet.block_trap attribute, that this does not block can be easy verified.

Example - verified receiving without blocking:

channel.preference = -1

old_value = stackless.current.block_trap
try:
    while channel.balance > 0:
        total += channel.receive()
finally:
    stackless.current.block_trap = old_value
channel.schedule_all
Setting this attribute to 1 overrides the value assigned to the preference attribute. If set to 1, then any channel action will result in involved tasklets being scheduled to continue execution later.

Read-only attributes are provided for checking channel state and contents.

channel.balance

The number of tasklets waiting to send (>0) or receive (<0).

Example - reawakening all blocked senders:

>>> while channel.balance > 0:
...     channel.send(None)
channel.closing
The value of this attribute is True when close() has been called.
channel.closed
The value of this attribute is True when close() has been called and the channel is empty.
channel.queue

This value of this attribute is the first tasklet in the chain of tasklets that are blocked on the channel. If the value is None, then the channel is empty.

Example - printing out the chain of tasklets blocked on the channel:

>>> t = channel.queue
>>> idx = 0
>>> while t is not None:
...     print idx, id(t)
...     t = t.next
...     idx += 1
... else:
...     print "The channel is empty."

Table Of Contents

Previous topic

Tasklets — Lightweight threads

Next topic

The scheduler — How tasklets are run

This Page