Package org.red5.server.stream.consumer
Class FileConsumer
java.lang.Object
org.red5.server.stream.consumer.FileConsumer
- All Implemented Interfaces:
IConsumer
,IMessageComponent
,IPipeConnectionListener
,IPushableConsumer
,Constants
,DisposableBean
,InitializingBean
public class FileConsumer
extends Object
implements Constants, IPushableConsumer, IPipeConnectionListener, InitializingBean, DisposableBean
Consumer that pushes messages to file. Used when recording live streams.
- Author:
- The Red5 Project, Paul Gregoire (mondain@gmail.com), Vladimir Hmelyoff (vlhm@splitmedialabs.com), Octavian Naicu (naicuoctavian@gmail.com)
-
Nested Class Summary
Modifier and TypeClassDescriptionprivate static final class
Queued data wrapper. -
Field Summary
Modifier and TypeFieldDescriptionprivate ITag
Audio decoder configurationprivate boolean
Whether or not to use a queue for delaying file writes.private File
Fileprivate AtomicBoolean
private int
Last write timestampprivate int
Tracks the last timestamp written to prevent backwards time stamped data.private static final org.slf4j.Logger
private String
Operation modeprivate int
Percentage of the queue which is sliced for writingprivate PriorityQueue<FileConsumer.QueuedData>
Queue to hold data for delayed writingprivate int
Number of queued items needed before writes are initiatedprivate Lock
Read lockprivate ReentrantReadWriteLock
Reentrant lockprivate ScheduledExecutorService
Executor for all instance writer jobsprivate int
Queue writer thread countprivate IScope
Scopeprivate int
Start timestampprivate ITag
Video decoder configurationprivate Lock
Write lockprivate ITagWriter
Tag writerprivate Future<?>
Keeps track of the last spawned write worker.Fields inherited from interface org.red5.server.net.rtmp.message.Constants
HANDSHAKE_SIZE, HEADER_CONTINUE, HEADER_NEW, HEADER_SAME_SOURCE, HEADER_TIMER_CHANGE, MEDIUM_INT_MAX, SO_CLIENT_CLEAR_DATA, SO_CLIENT_DELETE_DATA, SO_CLIENT_INITIAL_DATA, SO_CLIENT_SEND_MESSAGE, SO_CLIENT_STATUS, SO_CLIENT_UPDATE_ATTRIBUTE, SO_CLIENT_UPDATE_DATA, SO_CONNECT, SO_DELETE_ATTRIBUTE, SO_DISCONNECT, SO_SEND_MESSAGE, SO_SET_ATTRIBUTE, SOURCE_TYPE_LIVE, SOURCE_TYPE_VOD, TYPE_ABORT, TYPE_AGGREGATE, TYPE_AUDIO_DATA, TYPE_BYTES_READ, TYPE_CHUNK_SIZE, TYPE_CLIENT_BANDWIDTH, TYPE_EDGE_ORIGIN, TYPE_FLEX_MESSAGE, TYPE_FLEX_SHARED_OBJECT, TYPE_FLEX_STREAM_SEND, TYPE_INVOKE, TYPE_NOTIFY, TYPE_PING, TYPE_SERVER_BANDWIDTH, TYPE_SHARED_OBJECT, TYPE_STREAM_METADATA, TYPE_VIDEO_DATA
Fields inherited from interface org.red5.server.messaging.IPushableConsumer
KEY
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionprivate boolean
acquireWriteFuture
(int sliceLength) Get the WriteFuture with a timeout based on the length of the slice to write.void
private FileConsumer.QueuedData[]
createFixedLengthSlice
(int sliceLength) private FileConsumer.QueuedData[]
createTimestampLimitedSlice
(int timestamp) void
destroy()
final void
doWrites()
Write all the queued items to the writer.final void
doWrites
(FileConsumer.QueuedData[] slice) Write a slice of the queued items to the writer.getFile()
Returns the file.int
Returns the size of the delayed writing queue.int
private void
init()
Initializationboolean
Whether or not the queue should be utilized.void
onOOBControlMessage
(IMessageComponent source, IPipe pipe, OOBControlMessage oobCtrlMsg) Out-of-band control message handlervoid
Pipe connection event handlervoid
pushMessage
(IPipe pipe, IMessage message) Push message through pipevoid
setAudioDecoderConfiguration
(IRTMPEvent decoderConfig) Sets a audio decoder configuration; some codecs require this, such as AAC.void
setDelayWrite
(boolean delayWrite) Sets whether or not to use the queue.void
Sets the file we're writing to.void
Sets the recording mode.void
setQueueThreshold
(int queueThreshold) Sets the threshold for the queue.void
setSchedulerThreadSize
(int schedulerThreadSize) void
Sets the scope for this consumer.void
setVideoDecoderConfiguration
(IRTMPEvent decoderConfig) Sets a video decoder configuration; some codecs require this, such as AVC.void
uninit()
Reset or uninitializeprivate final void
write
(int timestamp, IRTMPEvent msg) Write incoming data to the file.private final void
write
(FileConsumer.QueuedData queued) Adjust timestamp and write to the file.private void
-
Field Details
-
log
private static final org.slf4j.Logger log -
scheduledExecutorService
Executor for all instance writer jobs -
schedulerThreadSize
private int schedulerThreadSizeQueue writer thread count -
queue
Queue to hold data for delayed writing -
reentrantLock
Reentrant lock -
writeLock
Write lock -
readLock
Read lock -
scope
Scope -
file
File -
writer
Tag writer -
mode
Operation mode -
startTimestamp
private int startTimestampStart timestamp -
lastTimestamp
private int lastTimestampLast write timestamp -
videoConfigurationTag
Video decoder configuration -
audioConfigurationTag
Audio decoder configuration -
queueThreshold
private int queueThresholdNumber of queued items needed before writes are initiated -
percentage
private int percentagePercentage of the queue which is sliced for writing -
delayWrite
private boolean delayWriteWhether or not to use a queue for delaying file writes. The queue is useful for keeping Tag items in their expected order based on their time stamp. -
lastWrittenTs
private volatile int lastWrittenTsTracks the last timestamp written to prevent backwards time stamped data. -
writerFuture
Keeps track of the last spawned write worker. -
gotVideoKeyFrame
-
-
Constructor Details
-
FileConsumer
public FileConsumer()Default ctor -
FileConsumer
Creates file consumer- Parameters:
scope
- Scope of consumerfile
- File
-
-
Method Details
-
afterPropertiesSet
- Specified by:
afterPropertiesSet
in interfaceInitializingBean
- Throws:
Exception
-
pushMessage
Push message through pipe- Specified by:
pushMessage
in interfaceIPushableConsumer
- Parameters:
pipe
- Pipemessage
- Message to push- Throws:
IOException
- if message could not be written
-
writeQueuedDataSlice
-
createFixedLengthSlice
-
createTimestampLimitedSlice
-
acquireWriteFuture
private boolean acquireWriteFuture(int sliceLength) Get the WriteFuture with a timeout based on the length of the slice to write.- Parameters:
sliceLength
-- Returns:
- true if successful and false otherwise
-
onOOBControlMessage
Out-of-band control message handler- Specified by:
onOOBControlMessage
in interfaceIMessageComponent
- Parameters:
source
- Source of messagepipe
- Pipe that is used to transmit OOB messageoobCtrlMsg
- OOB control message
-
onPipeConnectionEvent
Pipe connection event handler- Specified by:
onPipeConnectionEvent
in interfaceIPipeConnectionListener
- Parameters:
event
- Pipe connection event
-
init
Initialization- Throws:
IOException
- I/O exception
-
uninit
public void uninit()Reset or uninitialize -
doWrites
public final void doWrites()Write all the queued items to the writer. -
doWrites
Write a slice of the queued items to the writer.- Parameters:
slice
- set of queued data
-
write
Write incoming data to the file.- Parameters:
timestamp
- adjusted timestampmsg
- stream data
-
write
Adjust timestamp and write to the file.- Parameters:
queued
- queued data for write
-
setVideoDecoderConfiguration
Sets a video decoder configuration; some codecs require this, such as AVC.- Parameters:
decoderConfig
- video codec configuration
-
setAudioDecoderConfiguration
Sets a audio decoder configuration; some codecs require this, such as AAC.- Parameters:
decoderConfig
- audio codec configuration
-
setScope
Sets the scope for this consumer.- Parameters:
scope
- scope
-
setFile
Sets the file we're writing to.- Parameters:
file
- file
-
getFile
Returns the file.- Returns:
- file
-
setQueueThreshold
public void setQueueThreshold(int queueThreshold) Sets the threshold for the queue. When the threshold is met a worker is spawned to empty the sorted queue to the writer.- Parameters:
queueThreshold
- number of items to queue before spawning worker
-
getQueueThreshold
public int getQueueThreshold()Returns the size of the delayed writing queue.- Returns:
- queue length
-
isDelayWrite
public boolean isDelayWrite()Whether or not the queue should be utilized.- Returns:
- true if using the queue, false if sending directly to the writer
-
setDelayWrite
public void setDelayWrite(boolean delayWrite) Sets whether or not to use the queue.- Parameters:
delayWrite
- true to use the queue, false if not
-
getSchedulerThreadSize
public int getSchedulerThreadSize()- Returns:
- the schedulerThreadSize
-
setSchedulerThreadSize
public void setSchedulerThreadSize(int schedulerThreadSize) - Parameters:
schedulerThreadSize
- the schedulerThreadSize to set
-
setMode
Sets the recording mode.- Parameters:
mode
- either "record" or "append" depending on the type of action to perform
-
destroy
- Specified by:
destroy
in interfaceDisposableBean
- Throws:
Exception
-