namespace RsyncNet.Delta
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using Helpers;
public class DeltaStreamer
{
private int _streamChunkSize;
public DeltaStreamer()
{
StreamChunkSize = 16384;
}
#region Properties, indexers, events and operators: public
public int StreamChunkSize
{
get { return _streamChunkSize; }
set
{
if (value <= 0) throw new ArgumentException("value must be a positive number greater than 0");
_streamChunkSize = value;
}
}
#endregion
#region Methods: public
///
/// Reconstructs remote data, given a delta stream and a random access / seekable input stream,
/// all written to outputStream.
///
/// sequential stream of deltas
/// seekable and efficiently random access stream
/// sequential stream for output
public void Receive(Stream deltaStream, Stream inputStream, Stream outputStream)
{
if (deltaStream == null) throw new ArgumentNullException("deltaStream");
if (inputStream == null) throw new ArgumentNullException("inputStream");
if (outputStream == null) throw new ArgumentNullException("outputStream");
if (inputStream.CanSeek == false) throw new InvalidOperationException("inputStream must be seekable");
int commandByte;
while ((commandByte = deltaStream.ReadByte()) != -1)
{
if (commandByte == DeltaStreamConstants.NEW_BLOCK_START_MARKER)
{
int length = deltaStream.ReadInt();
var buffer = new byte[length];
deltaStream.Read(buffer, 0, length);
outputStream.Write(buffer, 0, length);
}
else if (commandByte == DeltaStreamConstants.COPY_BLOCK_START_MARKER)
{
long sourceOffset = deltaStream.ReadLong();
int length = deltaStream.ReadInt();
var buffer = new byte[length];
inputStream.Seek(sourceOffset, SeekOrigin.Begin);
inputStream.Read(buffer, 0, length);
outputStream.Write(buffer, 0, length);
}
else throw new IOException("Invalid data found in deltaStream");
}
}
public void Send(IEnumerable deltas, Stream inputStream, Stream outputStream)
{
if (deltas == null) throw new ArgumentNullException("deltas");
if (deltas.Count() == 0) throw new ArgumentException("'deltas' must have one or more IDelta objects");
if (inputStream == null) throw new ArgumentNullException("inputStream");
if (outputStream == null) throw new ArgumentNullException("outputStream");
foreach (IDelta delta in deltas)
{
if (delta is ByteDelta)
{
SendByteDelta(delta as ByteDelta, inputStream, outputStream);
}
else if (delta is CopyDelta)
{
SendCopyDelta(delta as CopyDelta, inputStream, outputStream);
}
}
}
#endregion
#region Methods: private
private void SendByteDelta(ByteDelta delta, Stream inputStream, Stream outputStream)
{
outputStream.WriteByte(DeltaStreamConstants.NEW_BLOCK_START_MARKER);
outputStream.WriteInt(delta.Length);
var buffer = new byte[delta.Length];
inputStream.Seek(delta.Offset, SeekOrigin.Begin);
long totalRead = 0;
while (totalRead < delta.Length)
{
var toRead = (int) MathEx.Bounded(0, StreamChunkSize, delta.Length - totalRead);
int readLength = inputStream.Read(buffer, 0, toRead);
if (readLength == 0 && totalRead < delta.Length)
throw new IOException("Input stream offset out of bounds, or not enough data available");
outputStream.Write(buffer, 0, readLength);
totalRead += readLength;
}
}
private void SendCopyDelta(CopyDelta delta, Stream inputStream, Stream outputStream)
{
if (inputStream.CanSeek == false) throw new IOException("inputStream not seekable");
outputStream.WriteByte(DeltaStreamConstants.COPY_BLOCK_START_MARKER);
outputStream.WriteLong(delta.Offset);
outputStream.WriteInt(delta.Length);
inputStream.Seek(delta.Length, SeekOrigin.Current);
}
#endregion
#region Nested type: DeltaStreamConstants
internal static class DeltaStreamConstants
{
#region Fields: public
public static byte COPY_BLOCK_START_MARKER = (byte) 'C';
public static byte NEW_BLOCK_START_MARKER = (byte) 'N';
#endregion
}
#endregion
}
}