Package org.red5.server.stream.consumer
Class FileConsumer
java.lang.Object
org.red5.server.stream.consumer.FileConsumer
- All Implemented Interfaces:
IConsumer,IMessageComponent,IPipeConnectionListener,IPushableConsumer,Constants,DisposableBean,InitializingBean
public class FileConsumer
extends Object
implements Constants, IPushableConsumer, IPipeConnectionListener, InitializingBean, DisposableBean
Consumer that pushes messages to file. Used when recording live streams.
- Author:
- The Red5 Project, Paul Gregoire ([email protected]), Vladimir Hmelyoff ([email protected]), Octavian Naicu ([email protected])
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprivate static final classQueued data wrapper. -
Field Summary
FieldsModifier and TypeFieldDescriptionprivate ITagAudio decoder configurationprivate booleanWhether or not to use a queue for delaying file writes.private FileFileprivate AtomicBooleanprivate intLast write timestampprivate intTracks the last timestamp written to prevent backwards time stamped data.private static final org.slf4j.Loggerprivate StringOperation modeprivate intPercentage of the queue which is sliced for writingprivate PriorityQueue<FileConsumer.QueuedData>Queue to hold data for delayed writingprivate intNumber of queued items needed before writes are initiatedprivate LockRead lockprivate ReentrantReadWriteLockReentrant lockprivate ScheduledExecutorServiceExecutor for all instance writer jobsprivate intQueue writer thread countprivate IScopeScopeprivate intStart timestampprivate ITagVideo decoder configurationprivate LockWrite lockprivate ITagWriterTag writerprivate Future<?>Keeps track of the last spawned write worker.Fields inherited from interface org.red5.server.net.rtmp.message.Constants
HANDSHAKE_SIZE, HEADER_CONTINUE, HEADER_NEW, HEADER_SAME_SOURCE, HEADER_TIMER_CHANGE, MEDIUM_INT_MAX, SO_CLIENT_CLEAR_DATA, SO_CLIENT_DELETE_DATA, SO_CLIENT_INITIAL_DATA, SO_CLIENT_SEND_MESSAGE, SO_CLIENT_STATUS, SO_CLIENT_UPDATE_ATTRIBUTE, SO_CLIENT_UPDATE_DATA, SO_CONNECT, SO_DELETE_ATTRIBUTE, SO_DISCONNECT, SO_SEND_MESSAGE, SO_SET_ATTRIBUTE, SOURCE_TYPE_LIVE, SOURCE_TYPE_VOD, TYPE_ABORT, TYPE_AGGREGATE, TYPE_AUDIO_DATA, TYPE_BYTES_READ, TYPE_CHUNK_SIZE, TYPE_CLIENT_BANDWIDTH, TYPE_EDGE_ORIGIN, TYPE_FLEX_MESSAGE, TYPE_FLEX_SHARED_OBJECT, TYPE_FLEX_STREAM_SEND, TYPE_INVOKE, TYPE_NOTIFY, TYPE_PING, TYPE_SERVER_BANDWIDTH, TYPE_SHARED_OBJECT, TYPE_STREAM_METADATA, TYPE_VIDEO_DATAFields inherited from interface org.red5.server.messaging.IPushableConsumer
KEY -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprivate booleanacquireWriteFuture(int sliceLength) Get the WriteFuture with a timeout based on the length of the slice to write.voidprivate FileConsumer.QueuedData[]createFixedLengthSlice(int sliceLength) private FileConsumer.QueuedData[]createTimestampLimitedSlice(int timestamp) voiddestroy()final voiddoWrites()Write all the queued items to the writer.final voiddoWrites(FileConsumer.QueuedData[] slice) Write a slice of the queued items to the writer.getFile()Returns the file.intReturns the size of the delayed writing queue.intprivate voidinit()InitializationbooleanWhether or not the queue should be utilized.voidonOOBControlMessage(IMessageComponent source, IPipe pipe, OOBControlMessage oobCtrlMsg) Out-of-band control message handlervoidPipe connection event handlervoidpushMessage(IPipe pipe, IMessage message) Push message through pipevoidsetAudioDecoderConfiguration(IRTMPEvent decoderConfig) Sets a audio decoder configuration; some codecs require this, such as AAC.voidsetDelayWrite(boolean delayWrite) Sets whether or not to use the queue.voidSets the file we're writing to.voidSets the recording mode.voidsetQueueThreshold(int queueThreshold) Sets the threshold for the queue.voidsetSchedulerThreadSize(int schedulerThreadSize) voidSets the scope for this consumer.voidsetVideoDecoderConfiguration(IRTMPEvent decoderConfig) Sets a video decoder configuration; some codecs require this, such as AVC.voiduninit()Reset or uninitializeprivate final voidwrite(int timestamp, IRTMPEvent msg) Write incoming data to the file.private final voidwrite(FileConsumer.QueuedData queued) Adjust timestamp and write to the file.private void
-
Field Details
-
log
private static final org.slf4j.Logger log -
scheduledExecutorService
Executor for all instance writer jobs -
schedulerThreadSize
private int schedulerThreadSizeQueue writer thread count -
queue
Queue to hold data for delayed writing -
reentrantLock
Reentrant lock -
writeLock
Write lock -
readLock
Read lock -
scope
Scope -
file
File -
writer
Tag writer -
mode
Operation mode -
startTimestamp
private int startTimestampStart timestamp -
lastTimestamp
private int lastTimestampLast write timestamp -
videoConfigurationTag
Video decoder configuration -
audioConfigurationTag
Audio decoder configuration -
queueThreshold
private int queueThresholdNumber of queued items needed before writes are initiated -
percentage
private int percentagePercentage of the queue which is sliced for writing -
delayWrite
private boolean delayWriteWhether or not to use a queue for delaying file writes. The queue is useful for keeping Tag items in their expected order based on their time stamp. -
lastWrittenTs
private volatile int lastWrittenTsTracks the last timestamp written to prevent backwards time stamped data. -
writerFuture
Keeps track of the last spawned write worker. -
gotVideoKeyFrame
-
-
Constructor Details
-
FileConsumer
public FileConsumer()Default ctor -
FileConsumer
Creates file consumer- Parameters:
scope- Scope of consumerfile- File
-
-
Method Details
-
afterPropertiesSet
- Specified by:
afterPropertiesSetin interfaceInitializingBean- Throws:
Exception
-
pushMessage
Push message through pipe- Specified by:
pushMessagein interfaceIPushableConsumer- Parameters:
pipe- Pipemessage- Message to push- Throws:
IOException- if message could not be written
-
writeQueuedDataSlice
-
createFixedLengthSlice
-
createTimestampLimitedSlice
-
acquireWriteFuture
private boolean acquireWriteFuture(int sliceLength) Get the WriteFuture with a timeout based on the length of the slice to write.- Parameters:
sliceLength-- Returns:
- true if successful and false otherwise
-
onOOBControlMessage
Out-of-band control message handler- Specified by:
onOOBControlMessagein interfaceIMessageComponent- Parameters:
source- Source of messagepipe- Pipe that is used to transmit OOB messageoobCtrlMsg- OOB control message
-
onPipeConnectionEvent
Pipe connection event handler- Specified by:
onPipeConnectionEventin interfaceIPipeConnectionListener- Parameters:
event- Pipe connection event
-
init
Initialization- Throws:
IOException- I/O exception
-
uninit
public void uninit()Reset or uninitialize -
doWrites
public final void doWrites()Write all the queued items to the writer. -
doWrites
Write a slice of the queued items to the writer.- Parameters:
slice- set of queued data
-
write
Write incoming data to the file.- Parameters:
timestamp- adjusted timestampmsg- stream data
-
write
Adjust timestamp and write to the file.- Parameters:
queued- queued data for write
-
setVideoDecoderConfiguration
Sets a video decoder configuration; some codecs require this, such as AVC.- Parameters:
decoderConfig- video codec configuration
-
setAudioDecoderConfiguration
Sets a audio decoder configuration; some codecs require this, such as AAC.- Parameters:
decoderConfig- audio codec configuration
-
setScope
Sets the scope for this consumer.- Parameters:
scope- scope
-
setFile
Sets the file we're writing to.- Parameters:
file- file
-
getFile
Returns the file.- Returns:
- file
-
setQueueThreshold
public void setQueueThreshold(int queueThreshold) Sets the threshold for the queue. When the threshold is met a worker is spawned to empty the sorted queue to the writer.- Parameters:
queueThreshold- number of items to queue before spawning worker
-
getQueueThreshold
public int getQueueThreshold()Returns the size of the delayed writing queue.- Returns:
- queue length
-
isDelayWrite
public boolean isDelayWrite()Whether or not the queue should be utilized.- Returns:
- true if using the queue, false if sending directly to the writer
-
setDelayWrite
public void setDelayWrite(boolean delayWrite) Sets whether or not to use the queue.- Parameters:
delayWrite- true to use the queue, false if not
-
getSchedulerThreadSize
public int getSchedulerThreadSize()- Returns:
- the schedulerThreadSize
-
setSchedulerThreadSize
public void setSchedulerThreadSize(int schedulerThreadSize) - Parameters:
schedulerThreadSize- the schedulerThreadSize to set
-
setMode
Sets the recording mode.- Parameters:
mode- either "record" or "append" depending on the type of action to perform
-
destroy
- Specified by:
destroyin interfaceDisposableBean- Throws:
Exception
-