This section covers some additional topics related to streams.
There are various requirements for how a module can do streaming. To illustrate this, consider these examples:
Scaling a signal by a factor of two.
Performing sample frequency conversion.
Decompressing a run-length encoded signal.
Reading MIDI events from /dev/midi00
and inserting them into a
stream.
The first case is the simplest: upon receiving 200 samples of input the module produces 200 samples of output. It only produces output when it gets input.
The second case produces different numbers of output samples when given 200 input samples. It depends what conversion is performed, but the number is known in advance.
The third case is even worse. From the outset you cannot even guess how much data 200 input bytes will generate (probably a lot more than 200 bytes, but...).
The last case is a module which becomes active by itself, and sometimes produces data.
In aRtss-0.3.4, only streams of the first type were handled, and most things worked nicely. This is probably what you need most when writing modules that process audio. The problem with the other, more complex types of streaming, is that they are hard to program, and that you don't need the features most of the time. That is why we do this with two different stream types: synchronous and asynchronous.
Synchronous streams have these characteristics:
Modules must be able to calculate data of any length, given enough input.
All streams have the same sampling rate.
The calculateBlock()
function will be called when
enough data is available, and the module can rely on the pointers
pointing to data.
There is no allocation and deallocation to be done.
Asynchronous streams, on the other hand, have this behavior:
Modules may produce data sometimes, or with varying sampling rate, or only if they have input from some filed descriptor. They are not bound by the rule “must be able to satisfy requests of any size”.
Asynchronous streams of a module may have entirely different sampling rates.
Outgoing streams: there are explicit functions to allocate packets, to send packets - and an optional polling mechanism that will tell you when you should create some more data.
Incoming streams: you get a call when you receive a new packet - you have to say when you are through with processing all data of that packet, which must not happen at once (you can say that anytime later, and if everybody has processed a packet, it will be freed/reused)
When you declare streams, you use the keyword “async” to indicate you want to make an asynchronous stream. So, for instance, assume you want to convert an asynchronous stream of bytes into a synchronous stream of samples. Your interface could look like this:
interface ByteStreamToAudio : SynthModule { async in byte stream indata; // the asynchronous input sample stream out audio stream left,right; // the synchronous output sample streams };
Suppose you decided to write a module to produce sound asynchronously. Its interface could look like this:
interface SomeModule : SynthModule { async out byte stream outdata; };
How do you send the data? The first method is called “push delivery”. With asynchronous streams you send the data as packets. That means you send individual packets with bytes as in the above example. The actual process is: allocate a packet, fill it, send it.
Here it is in terms of code. First we allocate a packet:
DataPacket<mcopbyte> *packet = outdata.allocPacket(100);
The we fill it:
// cast so that fgets is happy that it has a (char *) pointer char *data = (char *)packet->contents; // as you can see, you can shrink the packet size after allocation // if you like if(fgets(data,100,stdin)) packet->size = strlen(data); else packet->size = 0;
Now we send it:
packet->send();
This is quite simple, but if we want to send packets exactly as fast as the receiver can process them, we need another approach, the “pull delivery” method. You ask to send packets as fast as the receiver is ready to process them. You start with a certain amount of packets you send. As the receiver processes one packet after another, you start refilling them with fresh data, and send them again.
You start that by calling setPull. For example:
outdata.setPull(8, 1024);
This means that you want to send packets over outdata. You want to start sending 8 packets at once, and as the receiver processes some of them, you want to refill them.
Then, you need to implement a method which fills the packets, which could look like this:
void request_outdata(DataPacket<mcopbyte> *packet) { packet->size = 1024; // shouldn't be more than 1024 for(int i = 0;i < 1024; i++) packet->contents[i] = (mcopbyte)'A'; packet->send(); }
Thats it. When you don't have any data any more, you can start sending packets with zero size, which will stop the pulling.
Note that it is essential to give the method the exact name
request_
.
streamname
We just discussed sending data. Receiving data is much much simpler. Suppose you have a simple ToLower filter, which simply converts all letters in lowercase:
interface ToLower { async in byte stream indata; async out byte stream outdata; };
This is really simple to implement; here is the whole implementation:
class ToLower_impl : public ToLower_skel { public: void process_indata(DataPacket<mcopbyte> *inpacket) { DataPacket<mcopbyte> *outpacket = outdata.allocPacket(inpacket->size); // convert to lowercase letters char *instring = (char *)inpacket->contents; char *outstring = (char *)outpacket->contents; for(int i=0;i<inpacket->size;i++) outstring[i] = tolower(instring[i]); inpacket->processed(); outpacket->send(); } }; REGISTER_IMPLEMENTATION(ToLower_impl);
Again, it is essential to name the method
process_
.
streamname
As you see, for each arriving packet you get a call for a function (the
process_indata
call in our case). You need to call
the processed()
method of a packet to indicate
you have processed it.
Here is an implementation tip: if processing takes longer (i.e. if you need to wait for soundcard output or something like that), don't call processed immediately, but store the whole data packet and call processed only as soon as you really processed that packet. That way, senders have a chance to know how long it really takes to do your work.
As synchronization isn't so nice with asynchronous streams, you should use synchronous streams wherever possible, and asynchronous streams only when necessary.
Suppose you have 2 objects, for example an AudioProducer and an AudioConsumer. The AudioProducer has an output stream and AudioConsumer has an input one. Each time you want to connect them, you will use those 2 streams. The first use of defaulting is to enable you to make the connection without specifying the ports in that case.
Now suppose the teo objects above can handle stereo, and each have a “left” and “right” port. You'd still like to connect them as easily as before. But how can the connecting system know which output port to connect to which input port? It has no way to correctly map the streams. Defaulting is then used to specify several streams, with an order. Thus, when you connect an object with 2 default output streams to another one with 2 default input streams, you don't have to specify the ports, and the mapping will be done correctly.
Of course, this is not limited to stereo. Any number of streams can be made default if needed, and the connect function will check that the number of defaults for 2 object match (in the required direction) if you don't specify the ports to use.
The syntax is as follows: in the IDL, you can use the default keyword in the stream declaration, or on a single line. For example:
interface TwoToOneMixer { default in audio stream input1, input2; out audio stream output; };
In this example, the object will expect its two input ports to be connected by default. The order is the one specified on the default line, so an object like this one:
interface DualNoiseGenerator { out audio stream bzzt, couic; default couic, bzzt; };
Will make connections from “couic” to “input1”, and “bzzt” to “input2” automatically. Note that since there is only one output for the mixer, it will be made default in this case (see below). The syntax used in the noise generator is useful to declare a different order than the declaration, or selecting only a few ports as default. The directions of the ports on this line will be looked up by mcopidl, so don't specify them. You can even mix input and output ports in such a line, only the order is important.
There are some rules that are followed when using inheritance:
If a default list is specified in the IDL, then use it. Parent ports can be put in this list as well, whether they were default in the parent or not.
Otherwise, inherit parent's defaults. Ordering is parent1 default1, parent1 default2..., parent2 default1... If there is a common ancestor using 2 parent branches, a “virtual public”-like merging is done at that default's first occurrence in the list.
If there is still no default and a single stream in a direction, use it as default for that direction.
Would you like to make a comment or contribute an update to this page?
Send feedback to the KDE Docs Team