7. Step F: What functionality has to be performed? Handling instream messages

7.1. The processBuffer()-method

For a plug-in programmer, the processBuffer()-method is the central part of a node. Within this method, multimedia data can be generated, processed, or consumed. Furthermore, new events are generated here. Remember, events are not handled within this method, but within registered handling methods of interfaces. The signature of the processBuffer()-method is as follows.

Message* processBuffer(Buffer *);
      

7.2. General strategy - processor, converter, and filter nodes

The overall processing is the same for all node types: the processBuffer()-method is called and the node can return a message.

We will now further define the conditions under which the processBuffer()-method is called, what is passed as parameter, what can be returned, and what differences exists between the different node types (namely sources, processors, sinks, multiplexers, and demultiplexers). Do not worry if you do not understand all details, since the generic layer will provide all described functionality for you. Nevertheless it is necessary to roughly understand the described mechanims.

Let us first consider a GenericProcessor node. This node has a single input and a single output. It will try to get a message from its input if a message can be passed to its output. A message can only be dequeued from an input if the predecessor of the node already provided one. A node cannot pass a message to its output if its successor does not allow so. This is the case if the input of the successor is already congested.

Since nodes not only support downstream messages but also upstream messages, the same test will be performed with roles switched for input and output. Furthermore, since upstream messages are considered to be relatively rare and thus more important, this test is performed first. All following explanations will focus on downstream messages, upstream messages are treated analogous.

If the above criterion is satisfied, a message will be dequeued. If the message is an event (or more precisely a composite event), it will be handled by calling the registered methods. The message will then be passed on, either downstream (if it was dequeued from an input) or upstream (if it was dequeued from an output). Additionally, you can delete events from a composite event, insert new events, or manipulate the parameters associated with an event.

Result MyNode::handleEventAndDelete() 
{
  // return DELETE to delete current instream event
  return DELETE;
}

Result MyNode::handleEventAndInsertNewEvent() 
{
  //create the new event.
  Event* new_event = ISomeInterface::create_Foo("Add a new Event");

  //insert the event
  insertEvent(new_event);

  return SUCCESS;
}

Result MyNode::handleEventAndReplace() 
{
  //create the new event.
  Event* new_event = ISomeInterface::create_Foo("Add a new Event");

  //insert the event
  insertEvent(new_event);

  // return DELETE to delete current instream event 
  return DELETE;
}

Result MyNode::handleEventAndManipulateParameters(int &i) 
{
  // change value
  ++i;

  return SUCCESS;
}
      

The used create_-methods are automatically generated for instream events by the IDL compilers (see NMM IDL documentation).

If the message is a buffer, the processBuffer()-method will be called with the buffer passed as parameter. The node can modify the data in the buffer and return the same buffer. This processor node would be a filter then. Notice, that the node then needs to request a writable instance of the current buffer, since every buffer (or every message in general) might be in different parts of a flow graph.

Message* MyNode::processBuffer(Buffer *in_buffer)
{
  // check reference count of buffer
  in_buffer = in_buffer->getWriteableInstance();
  
  // get data of buffer
  char* p = in_buffer->getData();

  // modify data ...

  return in_buffer;
}
      

The method getWritableInstance() checks the reference count and returns a copy of the buffer if necessary.

The node could also request a new buffer (perhaps with a different size), fill that buffer with data and finally return it. The incoming buffer then has to be released.

Message* MyNode::processBuffer(Buffer *in_buffer)
{
  // get new buffer with other size
  Buffer* out_buffer = getNewBuffer( in_buffer->getSize() * 2 );
  
  // get data of buffer
  char* p = in_buffer->getData();

  // some code ...

  // release in_buffer
  in_buffer->release();

  return out_buffer;
}
      

The node could also release the buffer and return null (or only release every second buffer). Of course, if null is returned, nothing will be forwarded to the successor of the node.

Message* MyNode::processBuffer(Buffer *in_buffer)
{
  in_buffer->release();

  return 0;
}
      

The node could also release the buffer and create an instream message.

Message* MyNode::processBuffer(Buffer *in_buffer)
{
  in_buffer->release();

  // return new event  
  return 
    new CEvent( ISomeInterface::create_Foo("new Event") );
}
      

7.3. The working-flag

But what if a node wants to create two or more buffers out of one? Then the working-flag has to be set. If this flag is true, processBuffer() is called with a null-pointer even if a message (i.e. a buffer or an event) could be dequeued. This flag should be set to true in two cases.

First, the node receives one buffer as input and produces more than one buffer or event as output.

Message* MyNode::processBuffer(Buffer *in_buffer)
{
  Buffer* out_buffer = 0;

  if(!getWorkingFlag()) { 
    // if the working-flag is false, processBuffer() was
    // triggered with new in_buffer -> start processing
    setWorkingFlag(true);  

    // save buffer for next processBuffer()
    current_in_buffer = in_buffer;

    // get new out_buffer 
    out_buffer = getNewBuffer(some_size);

    // some code ...

  }
  else { 
    // working-flag is true
    // -> processBuffer() was triggered with 0-pointer
    // -> continue work on old in_buffer

    // get new out_buffer 
    out_buffer = getNewBuffer(some_size);

    // some code ...

    if(finished_current_in_buffer) {
      // release current in_buffer
      current_in_buffer->release();
  }

  // both cases -> return out_buffer
  return out_buffer;
}
      

Secondly, the node receives an event (either instream or out-of-band) and wants to produce one or more buffers or events in response.

// some registered event handler
Result MyNode::handleSomeEvent() 
{
  // set working-flag to indicate that buffers or events
  // can be produced without incoming buffers
  setWorkingFlag(true);

  // set flag to indicate that new buffers should be generated
  produce_new_out_buffer = true;
  
  return SUCCESS;
}

Result MyNode::handleSomeOtherEvent() 
{
  // set working-flag to indicate that buffers or events
  // can be produced without incoming buffers
  setWorkingFlag(true);

  // set flag to indicate that new event should be generated
  produce_new_event = true;
  
  return SUCCESS;
}

Message* MyNode::processBuffer(Buffer *in_buffer)
{
  if(!getWorkingFlag()) { 
    // if the working-flag is false, processBuffer() 
    // was triggered with new in_buffer 
    // -> start processing in_buffer

    // get new out_buffer 
    Buffer* out_buffer = getNewBuffer(some_size);

    // some code ...

    // do not forget to release in_buffer
    in_buffer->release();

    return out_buffer;
  }
  else { 
    // working-flag was set to true in 
    // MyNode::handleSomeEvent() or MyNode::handleSomeOtherEvent()
    // -> processBuffer() was triggered with 0-pointer
    // -> generate new buffers or events

    // flag was set to true in MyNode::handleSomeEvent()
    if(produce_new_out_buffer) {
      // get new out_buffer 
      Buffer* out_buffer = getNewBuffer(some_size);
 
      // some code ...

      return out_buffer;
    }
    // flag was set to true in MyNode::handleSomeOtherEvent()
    else if(produce_new_event) {
      // some code ...

      // return new event  
      return 
        new CEvent( ISomeInterface::create_Foo("new Event") );
    }
    else {
      return 0;
    }
  }
}
      

Notice: the access to the working-flag is not guarded by a mutex. If you need this feature, you will have to implement a wrapper method.

7.4. Upstream and downstream messages

Messages created within processBuffer() can be sent either upstream or downstream by setting the direction of the message. A typical processBuffer()-method might look similar to this.

Message* MyNode::processBuffer(Buffer* in_buffer) 
{
  // some other code ...

  // switch between different states 
  switch(state) {
  case CREATE_SOME_DOWNSTREAM_EVENT:
    // now node is really finished and will return end track

    // default direction for all messages is downstream, 
    // so simply create new event and return it
    return 
      new CEvent( ISomeInterface::create_Foo("some downstream event") );
    break;
  case CREATE_SOME_UPSTREAM_EVENT:
    // default direction for all messages is downstream, 
    // so set direction to upstream and return it
    CEvent *e = 
      new CEvent( ISomeInterface::create_Foo("some upstream event") );
    e->setDirection(UPSTREAM);
    return e;
    break;
  case DATA:
    // get out buffer 
    // default direction for all messages is downstream, 
    // so simply create new event and return it
    out_buffer = getNewBuffer(some_size);

    // do real work 

    // some code ...

    // release in_buffer
    in_buffer->release();

    // return buffer
    return out_buffer;
    break;
  default:
    // error, return null
    ERROR_STREAM ("No internal state found!" << endl);
    in_buffer->release();
    return 0;
  }
}
      

7.5. Sink nodes

As a sink node is not connected to a successor, it should always return a null-pointer from its processBuffer()-method. Do not forget to release incoming buffers.

7.6. Source nodes and the producing-flag

As a source node is not connected to a predecessor, its processBuffer()-method is called with a null-pointer. But remember that a source might also receive upstream messages. If the working-flag would always be set to true for sources, these messages would never be dequeued. Reread the explanation about the working-flag if this is not clear to you. Therefore, another flag is used, the producing-flag. If this flag is true, processBuffer() is called with a null-pointer unless there is a message (i.e. a buffer or an event) that can be dequeued. This flag should mainly be set to true in one case. The node is a source node and wants to produce messages all the time while still being able to process new messages received. GenericSourceNode has set this flag to true by default, all other generic nodes to false. Some other node might set this flag to true or false depending on a timer (e.g. to achieve certain buffer output).

Notice: the access to the producing-flag is not guarded by a mutex. If you need this feature, you will have to implement a wrapper method.

7.7. Multiplexer nodes

A multiplexer node has one output and several inputs. Per default all its inputs are enabled. When trying to dequeue a message from the inputs of a multiplexer, all enabled inputs are tested for messages. If a message was dequeued from an input, another input will be preferred next time. More precesily, the next enabled input in the list of all inputs will be queried first. This strategy is called round robin. If an input is not enabled, it will not be queried, even if messages are available. The input from which the current message was dequeued can be queried by calling getCurrentRecvInputStream(). The following example shows a multiplexer with two inputs. Buffers will be dequeued in turn from these inputs.

Result MyNode::doInit() 
{
  // some code ... 

  // add two inputs, both are enabled per default
  addInputStream("input1");
  addInputStream("input2");

  // disable input2
  setRecvInputStreamEnabled("input2", false);

  return SUCCESS;
}

Message* MyNode::processBuffer(Buffer* in_buffer) 
{
  if(!strcmp(getCurrentRecvInputStream(), "input1")) {
    // disable input1, enable input2
    setRecvInputStreamEnabled("input1", false);
    setRecvInputStreamEnabled("input2", true);

    // some code ... 
  }

  if(!strcmp(getCurrentRecvInputStream(), "input2")) {
    // disable input2, enable input1
    setRecvInputStreamEnabled("input2", false);
    setRecvInputStreamEnabled("input1", true);

    // some code ... 
  }
}
      

The next example shows an multiplexer with both of its inputs enabled. The input from which the current buffer was dequeued is queried by calling the getCurrentRecvInputStream()-method. None of the inputs will be disabled at any time. The round robin dequeueing strategy will then decide which input to chose.

Result MyNode::doInit() 
{
  // some code ... 

  // add two inputs, both are enabled per default
  addInputStream("input1");
  addInputStream("input2");

  return SUCCESS;
}

Message* MyNode::processBuffer(Buffer* in_buffer) 
{
  if(!strcmp(getCurrentRecvInputStream(), "input1")) {
    // some code ... 
  }

  if(!strcmp(getCurrentRecvInputStream(), "input2")) {
    // some code ... 
  }
}
      

7.8. Demultiplexer nodes

A demultiplexer node has one input and several outputs. To specify to which output a message is to be sent when returning from processBuffer(), the setCurrentSendOutputStream()-method is used.

Result MyNode::doInit() 
{
  // some code ... 

  // add two inputs, both are enabled per default
  addOutputStream("output1");
  addOutputStream("output2");

  return SUCCESS;
}

Message* MyNode::processBuffer(Buffer* in_buffer) 
{
  if( /* sent generated buffer or event to output1 ? */ ) {
    setCurrentSendOutputStream("output1");

    // some code ... 

    return my_output_message;
  }

  if( /* sent generated buffer or event to output2 ? */ ) {
    setCurrentSendOutputStream("output2");

    // some code ... 

    return my_output_message;
  }
}
      

7.9. Multiplexer-demultiplexer nodes

The GenericMuxDemuxNode simply combines a multiplexer and a demultiplexer. This means that this node supports more than one input and more than one output.

7.10. Developing a processBuffer()-method for a new node

That is all you need to know. And that is all we can tell you. Unfortunately, we cannot tell you how to program plug-in X. As always, there are several ways to realize a wanted functionality within a given programming model. Maybe the best way is to check existing nodes.

7.11. Summary

Maybe you might now think that all different types of nodes can be represented by a GenericMuxDemuxNode. That is right. In fact, the class GenericNode, which is the super class for all other generic nodes, is roughly speaking a node with n inputs and m outputs. The sub classes only set the correct numbers of inputs and outputs.

Well, now it is time to open your editor and do some coding. If everything is running and you have already created a small example application with your new node, you can go to the next step to see how to register your node with the NMM registry.