jellyfin/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs

82 lines
2.3 KiB
C#
Raw Normal View History

2016-10-07 08:08:13 -07:00
using System;
2017-02-01 13:55:56 -07:00
using System.Collections.Concurrent;
2016-10-07 08:08:13 -07:00
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MediaBrowser.Model.Logging;
2017-03-26 09:26:52 -07:00
using MediaBrowser.Model.Net;
2016-10-07 08:08:13 -07:00
2016-11-03 16:35:19 -07:00
namespace Emby.Server.Implementations.LiveTv.TunerHosts
2016-10-07 08:08:13 -07:00
{
public class MulticastStream
{
2017-05-31 22:42:49 -07:00
private readonly ConcurrentDictionary<Guid, QueueStream> _outputStreams = new ConcurrentDictionary<Guid, QueueStream>();
2016-10-07 08:08:13 -07:00
private const int BufferSize = 81920;
private readonly ILogger _logger;
public MulticastStream(ILogger logger)
{
_logger = logger;
}
public async Task CopyUntilCancelled(Stream source, Action onStarted, CancellationToken cancellationToken)
{
2017-05-02 05:53:21 -07:00
if (source == null)
{
throw new ArgumentNullException("source");
}
2017-05-31 22:42:49 -07:00
while (true)
2016-10-07 08:08:13 -07:00
{
2017-05-31 22:42:49 -07:00
cancellationToken.ThrowIfCancellationRequested();
2017-08-19 12:43:35 -07:00
byte[] buffer = new byte[BufferSize];
2017-05-31 22:42:49 -07:00
var bytesRead = source.Read(buffer, 0, buffer.Length);
2016-10-07 08:08:13 -07:00
if (bytesRead > 0)
{
2017-08-19 15:37:15 -07:00
foreach (var stream in _outputStreams)
2017-03-27 12:31:24 -07:00
{
2017-08-19 15:37:15 -07:00
stream.Value.Queue(buffer, 0, bytesRead);
2017-03-26 09:26:52 -07:00
}
if (onStarted != null)
{
var onStartedCopy = onStarted;
onStarted = null;
Task.Run(onStartedCopy);
}
}
else
{
await Task.Delay(100).ConfigureAwait(false);
}
}
}
2017-05-15 23:42:33 -07:00
public Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
2016-10-07 08:08:13 -07:00
{
2017-08-19 15:37:15 -07:00
var queueStream = new QueueStream(stream, _logger);
2017-02-01 13:55:56 -07:00
2017-08-19 15:37:15 -07:00
_outputStreams.TryAdd(queueStream.Id, queueStream);
2016-10-07 08:08:13 -07:00
2017-08-19 15:37:15 -07:00
try
{
queueStream.Start(cancellationToken);
}
finally
{
_outputStreams.TryRemove(queueStream.Id, out queueStream);
GC.Collect();
}
2016-10-07 08:08:13 -07:00
2017-08-19 15:37:15 -07:00
return Task.FromResult(true);
2016-10-07 08:08:13 -07:00
}
}
}