PacketStream
PacketStream
#include <icy/packetstream.h>Inherits:
Stateful< PacketStreamState >
Processes and broadcasts IPackets through a configurable adapter graph.
A PacketStream consists of one or many PacketSources, one or many PacketProcessors, and one or many delegate receivers.
This class enables the developer to setup a processor chain in order to perform arbitrary processing on data packets using interchangeable packet adapters, and pump the output to any delegate function or even another PacketStream.
Note that PacketStream itself inherits from PacketStreamAdapter, so a PacketStream can be the source of another PacketStream.
All PacketStream methods are thread-safe, but once the stream is running you will not be able to attach or detach stream adapters.
In order to synchronize output packets with the application event loop take a look at the SyncPacketQueue class. For lengthy operations you can add an AsyncPacketQueue to the start of the stream to defer processing from the PacketSource thread.
Public Attributes
| Return | Name | Description |
|---|---|---|
PacketSignal | emitter | Signals to delegates on outgoing packets. |
ThreadSignal< void(PacketStream &, const std::exception_ptr &)> | Error | Signals that the PacketStream is in Error state. If stream output is synchronized then the Error signal will be sent from the synchronization context, otherwise it will be sent from the async processor context. See synchronizeOutput() |
ThreadSignal< void(PacketStream &)> | Close | Signals that the PacketStream is in Close state. This signal is sent immediately via the close() method, and as such will be sent from the calling thread context. |
emitter
PacketSignal emitterSignals to delegates on outgoing packets.
Error
ThreadSignal< void(PacketStream &, const std::exception_ptr &)> ErrorSignals that the PacketStream is in Error state. If stream output is synchronized then the Error signal will be sent from the synchronization context, otherwise it will be sent from the async processor context. See synchronizeOutput()
Close
ThreadSignal< void(PacketStream &)> CloseSignals that the PacketStream is in Close state. This signal is sent immediately via the close() method, and as such will be sent from the calling thread context.
Public Methods
| Return | Name | Description |
|---|---|---|
PacketStream | Construct a named packet stream. | |
~PacketStream virtual | Destroy the stream; calls close() then reset() to release all adapters. | |
PacketStream | Deleted constructor. | |
PacketStream | Deleted constructor. | |
void | start virtual | Start the stream and synchronized sources. |
void | stop virtual | Stop the stream and synchronized sources. |
void | pause virtual | Pause the stream. |
void | resume virtual | Resume the stream. |
void | close virtual | Close the stream and transition the internal state to Closed. |
void | reset virtual | Cleanup all managed stream adapters and reset the stream state. |
bool | active virtual const | Returns true when the stream is in the Active state. |
bool | stopped virtual const | Returns true when the stream is in the Stopping or Stopped state. |
bool | closed virtual const | Returns true when the stream is in the Closed or Error state. |
bool | lock virtual | Sets the stream to locked state. In a locked state no new adapters can be added or removed from the stream until the stream is stopped. |
bool | locked virtual const | Returns true is the stream is currently locked. |
void | write virtual | Write a mutable buffer into the stream without copying. The caller must keep the buffer alive until processing crosses a Cloned/Retained boundary or, if the graph is fully synchronous, until write() returns. |
void | write virtual | Write a read-only buffer into the stream; data is copied immediately into an owning RawPacket before any adapter sees it. |
void | write virtual | Write a packet directly into the processing chain. |
void | attachSource virtual | Attach a bare packet signal as a stream source. The signal is wrapped in an unowned PacketStreamAdapter internally. Useful when the source is another PacketStream::emitter. |
void | attachSource virtual | Attach a PacketStreamAdapter as a source. Source adapters default to Borrowed retention unless overridden; they must not retain inbound packet storage asynchronously without reporting Cloned or Retained. |
void | attachSource inline | Attach a shared_ptr-managed source adapter to the stream. The stream shares ownership; the adapter is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketStreamAdapter. |
bool | detachSource virtual | Detach a source by its packet signal. Disconnects the signal from the stream's process slot and removes the adapter entry. |
bool | detachSource virtual | Detach a source by its adapter pointer. Disconnects the adapter's emitter from the stream's process slot and removes the entry. |
void | attach virtual | Attach a packet processor to the stream. Processors are executed in ascending order of their order value. Pass order = -1 to append at the end of the current processor list. Valid range is -1 to 101; values outside this range throw std::invalid_argument. Borrowed processors must finish with the packet before process() returns. Queue/processors that defer work must report Cloned or Retained via retention() so upstream code has an explicit ownership boundary. |
void | attach inline | Attach a shared_ptr-managed processor to the stream. The stream shares ownership; the processor is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketProcessor. |
bool | detach virtual | Detach a packet processor from the stream. The processor's delegate connections are removed; ownership is released if held. |
void | synchronizeOutput virtual | Synchronize stream output packets with a libuv event loop. Internally attaches a SyncPacketQueue at order 101 so that all packets emitted by the processor chain are dispatched from the loop thread rather than the source thread. Must be called before start(). |
void | autoStart virtual | Enable or disable auto-start behaviour (default: false). When enabled, the stream automatically transitions to Active state upon receiving the first packet while in the None or Locked state. Must be called before start(). |
void | closeOnError virtual | Enable or disable close-on-error behaviour (default: true). When enabled, an unhandled processor exception causes the stream to transition from Error to Closed state automatically. |
const std::exception_ptr & | error | Accessors for the unmanaged client data pointer. |
std::string | name const | Return the name assigned to this stream at construction. |
PacketAdapterVec | adapters const | Returns a combined list of all stream sources and processors. |
PacketAdapterVec | sources const | Returns a list of all stream sources. |
PacketAdapterVec | processors const | Returns a list of all stream processors. |
int | numSources const | Return the number of source adapters currently registered. |
int | numProcessors const | Return the number of processor adapters currently registered. |
int | numAdapters const | Return the total number of adapters (sources + processors). |
AdapterT * | getSource inline | Return the nth source of type AdapterT, or nullptr if not found. Sources are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted. |
AdapterT * | getProcessor inline | Return the nth processor of type AdapterT, or nullptr if not found. Processors are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted. |
PacketProcessor * | getProcessor inline | Return the processor registered at a specific order value. Unlike the template overload, this searches by order rather than by type and index. |
PacketStream
PacketStream(const std::string & name)Construct a named packet stream.
Parameters
nameOptional human-readable name used in log output.
~PacketStream
virtual
virtual ~PacketStream()Destroy the stream; calls close() then reset() to release all adapters.
PacketStream
PacketStream(const PacketStream &) = deleteDeleted constructor.
PacketStream
PacketStream(PacketStream &&) = deleteDeleted constructor.
start
virtual
virtual void start()Start the stream and synchronized sources.
stop
virtual
virtual void stop()Stop the stream and synchronized sources.
pause
virtual
virtual void pause()Pause the stream.
resume
virtual
virtual void resume()Resume the stream.
close
virtual
virtual void close()Close the stream and transition the internal state to Closed.
reset
virtual
virtual void reset()Cleanup all managed stream adapters and reset the stream state.
active
virtual const
virtual bool active() constReturns true when the stream is in the Active state.
stopped
virtual const
virtual bool stopped() constReturns true when the stream is in the Stopping or Stopped state.
closed
virtual const
virtual bool closed() constReturns true when the stream is in the Closed or Error state.
lock
virtual
virtual bool lock()Sets the stream to locked state. In a locked state no new adapters can be added or removed from the stream until the stream is stopped.
locked
virtual const
virtual bool locked() constReturns true is the stream is currently locked.
write
virtual
virtual void write(char * data, size_t len)Write a mutable buffer into the stream without copying. The caller must keep the buffer alive until processing crosses a Cloned/Retained boundary or, if the graph is fully synchronous, until write() returns.
Parameters
dataPointer to the raw data buffer.lenNumber of bytes to process.
write
virtual
virtual void write(const char * data, size_t len)Write a read-only buffer into the stream; data is copied immediately into an owning RawPacket before any adapter sees it.
Parameters
dataPointer to the raw data buffer.lenNumber of bytes to process.
write
virtual
virtual void write(IPacket && packet)Write a packet directly into the processing chain.
Parameters
packetPacket to process; moved into the stream.
attachSource
virtual
virtual void attachSource(PacketSignal & source)Attach a bare packet signal as a stream source. The signal is wrapped in an unowned PacketStreamAdapter internally. Useful when the source is another PacketStream::emitter.
Parameters
sourceThe packet signal to attach; must outlive the stream.
attachSource
virtual
virtual void attachSource(PacketStreamAdapter * source, bool owned, bool syncState)Attach a PacketStreamAdapter as a source. Source adapters default to Borrowed retention unless overridden; they must not retain inbound packet storage asynchronously without reporting Cloned or Retained.
Parameters
sourceThe adapter to attach; must not be null.ownedIf true the stream takes ownership and deletes the pointer on teardown.syncStateIf true andsourceimplements basic::Startable, its start()/stop() will be called by startSources()/stopSources().
attachSource
inline
template<class C> inline void attachSource(std::shared_ptr< C > ptr, bool syncState)Attach a shared_ptr-managed source adapter to the stream. The stream shares ownership; the adapter is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketStreamAdapter.
Parameters
CAdapter type; must derive from PacketStreamAdapter.
Parameters
ptrShared pointer to the adapter instance.syncStateIf true andptrimplements basic::Startable, its start()/stop() will be called by startSources()/stopSources().
detachSource
virtual
virtual bool detachSource(PacketSignal & source)Detach a source by its packet signal. Disconnects the signal from the stream's process slot and removes the adapter entry.
Parameters
sourceThe packet signal previously attached via attachSource(PacketSignal&).
Returns
true if the source was found and removed, false otherwise.
detachSource
virtual
virtual bool detachSource(PacketStreamAdapter * source)Detach a source by its adapter pointer. Disconnects the adapter's emitter from the stream's process slot and removes the entry.
Parameters
sourcePointer to the adapter previously attached.
Returns
true if the source was found and removed, false otherwise.
attach
virtual
virtual void attach(PacketProcessor * proc, int order, bool owned)Attach a packet processor to the stream. Processors are executed in ascending order of their order value. Pass order = -1 to append at the end of the current processor list. Valid range is -1 to 101; values outside this range throw std::invalid_argument. Borrowed processors must finish with the packet before process() returns. Queue/processors that defer work must report Cloned or Retained via retention() so upstream code has an explicit ownership boundary.
Parameters
procThe processor to attach; must not be null.orderPosition in the processing chain (lower runs first).ownedIf true the stream takes ownership and deletes the pointer on teardown.
attach
inline
template<class C> inline void attach(std::shared_ptr< C > ptr, int order, bool syncState)Attach a shared_ptr-managed processor to the stream. The stream shares ownership; the processor is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketProcessor.
Parameters
CProcessor type; must derive from PacketProcessor.
Parameters
ptrShared pointer to the processor instance.orderPosition in the processing chain (lower runs first).syncStateReserved for future use; currently unused.
detach
virtual
virtual bool detach(PacketProcessor * proc)Detach a packet processor from the stream. The processor's delegate connections are removed; ownership is released if held.
Parameters
procPointer to the processor to remove.
Returns
true if the processor was found and removed, false otherwise.
synchronizeOutput
virtual
virtual void synchronizeOutput(uv::Loop * loop)Synchronize stream output packets with a libuv event loop. Internally attaches a SyncPacketQueue at order 101 so that all packets emitted by the processor chain are dispatched from the loop thread rather than the source thread. Must be called before start().
Parameters
loopThe event loop to synchronize output onto; must not be null.
autoStart
virtual
virtual void autoStart(bool flag)Enable or disable auto-start behaviour (default: false). When enabled, the stream automatically transitions to Active state upon receiving the first packet while in the None or Locked state. Must be called before start().
Parameters
flagtrue to enable auto-start, false to disable.
closeOnError
virtual
virtual void closeOnError(bool flag)Enable or disable close-on-error behaviour (default: true). When enabled, an unhandled processor exception causes the stream to transition from Error to Closed state automatically.
Parameters
flagtrue to close the stream on error, false to remain in Error state.
error
const std::exception_ptr & error()Accessors for the unmanaged client data pointer.
Return the last captured exception, if the stream is in Error state. The pointer is null when no error has occurred.
Returns
A reference to the stored exception_ptr; empty if no error.
name
const
std::string name() constReturn the name assigned to this stream at construction.
Returns
The stream name; empty string if none was provided.
adapters
const
PacketAdapterVec adapters() constReturns a combined list of all stream sources and processors.
sources
const
PacketAdapterVec sources() constReturns a list of all stream sources.
processors
const
PacketAdapterVec processors() constReturns a list of all stream processors.
numSources
const
int numSources() constReturn the number of source adapters currently registered.
Returns
Source count; thread-safe.
numProcessors
const
int numProcessors() constReturn the number of processor adapters currently registered.
Returns
Processor count; thread-safe.
numAdapters
const
int numAdapters() constReturn the total number of adapters (sources + processors).
Returns
Combined adapter count; thread-safe.
getSource
inline
template<class AdapterT> inline AdapterT * getSource(int index)Return the nth source of type AdapterT, or nullptr if not found. Sources are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted.
Parameters
AdapterTTarget type; must derive from PacketStreamAdapter.
Parameters
indexZero-based index among matching sources (default 0).
Returns
Pointer to the matching adapter, or nullptr.
getProcessor
inline
template<class AdapterT> inline AdapterT * getProcessor(int index)Return the nth processor of type AdapterT, or nullptr if not found. Processors are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted.
Parameters
AdapterTTarget type; must derive from PacketProcessor.
Parameters
indexZero-based index among matching processors (default 0).
Returns
Pointer to the matching processor, or nullptr.
getProcessor
inline
inline PacketProcessor * getProcessor(int order)Return the processor registered at a specific order value. Unlike the template overload, this searches by order rather than by type and index.
Parameters
orderThe order value to match (default 0).
Returns
Pointer to the matching processor, or nullptr if none registered at that order.
Protected Attributes
| Return | Name | Description |
|---|---|---|
std::mutex | _mutex | |
std::mutex | _procMutex | |
std::string | _name | |
PacketAdapterVec | _sources | |
PacketAdapterVec | _processors | |
std::deque< PacketStreamState > | _states | |
std::exception_ptr | _error | |
bool | _autoStart | |
bool | _closeOnError | |
bool | _wired |
_mutex
std::mutex _mutex_procMutex
std::mutex _procMutex_name
std::string _name_sources
PacketAdapterVec _sources_processors
PacketAdapterVec _processors_states
std::deque< PacketStreamState > _states_error
std::exception_ptr _error_autoStart
bool _autoStart_closeOnError
bool _closeOnError_wired
bool _wiredProtected Methods
| Return | Name | Description |
|---|---|---|
void | setup | Attach the source and processor delegate chain. |
void | teardown | Detach the source and processor delegate chain. |
void | attachSource | |
void | attach | |
void | startSources | Start synchronized sources. |
void | stopSources | Stop synchronized sources. |
void | process virtual | Process incoming packets. |
void | emit | Emit the final packet to listeners. |
void | synchronizeStates | Synchronize queued states with adapters. |
void | onStateChange virtual | Override the Stateful::onStateChange method. |
void | assertCanModify | Returns true if the given state ID is queued. |
void | handleException | Handle an internal exception. |
setup
void setup()Attach the source and processor delegate chain.
teardown
void teardown()Detach the source and processor delegate chain.
attachSource
void attachSource(PacketAdapterReference::Ptr ref)attach
void attach(PacketAdapterReference::Ptr ref)startSources
void startSources()Start synchronized sources.
stopSources
void stopSources()Stop synchronized sources.
process
virtual
virtual void process(IPacket & packet)Process incoming packets.
emit
void emit(IPacket & packet)Emit the final packet to listeners.
Synchronized signals such as Close and Error are sent from this method. See synchronizeOutput()
synchronizeStates
void synchronizeStates()Synchronize queued states with adapters.
onStateChange
virtual
virtual void onStateChange(PacketStreamState & state, const PacketStreamState & oldState)Override the Stateful::onStateChange method.
assertCanModify
void assertCanModify()Returns true if the given state ID is queued.
Asserts that the stream can be modified, ie is not in the Locked, Stopping or Active states.
handleException
void handleException(std::exception & exc)Handle an internal exception.
Public Types
| Name | Description |
|---|---|
Ptr |
Ptr
std::shared_ptr< PacketStream > Ptr()