0 votes
by (190 points)

Hi, im trying to upload and download files to cloud via sftp file server. I have this sample code for local drive files in this code file read as stream. But In my case during the upload process i have to send the file to the our cloud api in 1mb chunks. When downloading files, the chunks come from the cloud API in 1 mb size. After downloading or uploading all chunks wouldn't there be excessive memory usage if I converted all the parts into a single stream and sent it?

protected override NodeContent GetContent(NodeBase node, 
 NodeContentParameters contentRequest)
{
   string fullPath = GetFullPath(node.Path);
   if (contentRequest.AccessType == NodeContentAccess.Read)
   {
     var readOnlyStream = File.OpenRead(fullPath);
     return NodeContent.CreateReadOnlyContent(readOnlyStream);
   }

   FileAccess realFsAccessMode = 
 (contentRequest.AccessType == NodeContentAccess.Write)
            ? FileAccess.Write
            : FileAccess.ReadWrite;


        Stream readWriteStream = File.Open(fullPath, FileMode.Open, 
realFsAccessMode);
        return NodeContent.CreateImmediateWriteContent(readWriteStream);
    }
Applies to: Rebex SFTP, File Server

3 Answers

0 votes
by (5.3k points)
edited by

Part I

Hi Ahmet,
thanks for the question. Unfortunately, Rebex SFTP and VFS are components that must support many disparate scenarios. I am afraid that we don't have an out-of-the-box magical solution best suited to your scenario. You are in charge of handling special parts of the concrete solution. You should know the requirements and limitations of the solution, and if the many data parts/chunks cause any problem. And if you don't know, you should write a prototype, and then measure and not only estimate/guess the outcome of requirements. Is possible that the liabilities of your solution lie elsewhere than you think, and increased memory traffic will be OK in this case?
That being said:
If the heavy memory traffic poses a problem, you can try pool resources, e.g. streams. RecyclableMemoryStream from Microsoft might come in handy (https://github.com/microsoft/Microsoft.IO.RecyclableMemoryStream).
The following experimental (part II) BlobWriteStream from our experimental Azure BLOB provider, which uses RecyclableMemoryStream, may serve as an inspiration for you.

Key parts of BlobWriteStream code related to RecyclableMemoryStream.

1)The instance variable _memoryStreamManager of type RecyclableMemoryStreamManager.

2) BlobWriteStream.Write methods acquire Stream from _memoryStreamManager.

3) Background 'UploadData' task releases (disposes) stream acquired in the previous step when the Write operation is done. Dispose of the stream means that the Stream is returned to _memoryStreamManager and reused.

(Due to the forum limitation I have to to divide my code into three posts.)

0 votes
by (5.3k points)
edited by

Part II

namespace Rebex.SimpleAzureBlobProvider.Tools
{
  public class BlobStream : Stream
  {
    private enum StreamWriteMode
    {
      Overwrite,
      Update
    }
    private const int FIRST_BLOCK_INDEX = 0;
    private readonly BlobClient _blobClient;
    private readonly BlockBlobClient _blobBlockClient;
    private ChannelReader<StreamBlockIdPair> _channelReader;
    private Channel<StreamBlockIdPair> _channelStream;
    private ChannelWriter<StreamBlockIdPair> _channelWriter;
    private readonly RecyclableMemoryStreamManager _memoryStreamManager;
    private CancellationTokenSource _cts;
    private int _position;
    private Task _uploadToAzureTask;
    private readonly BlobWriteStreamSettings _settings;
    private long _length;
    private MemoryStream _lastWaitingToUploadStream;
    private StreamWriteMode _mode;
    private List<Action> _waitingActionsBeforeWrite;

    public BlobStream(BlobClient blobClient, BlockBlobClient blobBlockClient, RecyclableMemoryStreamManager memoryStreamManager, BlobWriteStreamSettings settings = null)
    {
      _settings = settings ?? BlobWriteStreamSettings.Default;
      _blobClient = blobClient ?? throw new ArgumentNullException(nameof(blobClient));
      _blobBlockClient = blobBlockClient ?? throw new ArgumentNullException(nameof(blobBlockClient));

      _memoryStreamManager = memoryStreamManager ?? throw new ArgumentNullException(nameof(memoryStreamManager));
      CreateChannel();
      _position = 0;
      _length = 0;
      _uploadToAzureTask = null;
      _lastWaitingToUploadStream = null;
      var committedBlockList = _blobBlockClient.GetBlockList(BlockListTypes.Committed).Value.CommittedBlocks;
      _mode = !committedBlockList.Any() || (committedBlockList.First().Size != settings.MinBatchSizeInBytes || committedBlockList.First().Name != createBlockId(FIRST_BLOCK_INDEX))
                ? StreamWriteMode.Update
                : StreamWriteMode.Overwrite;

      _waitingActionsBeforeWrite = new List<Action>();
    }

    private void CreateChannel()
    {
      _cts = new CancellationTokenSource();
      _channelStream = Channel.CreateUnbounded<StreamBlockIdPair>(new UnboundedChannelOptions
                                                       {
                                                         SingleReader = true,
                                                         SingleWriter = true
                                                       });
      _channelWriter = _channelStream.Writer;
      _channelReader = _channelStream.Reader;
    }

    public override bool CanRead => false;

    public override bool CanSeek => false;

    public override bool CanWrite => true;

    public override long Length => _length;

    public override long Position
    {
      get => _position;
      set => throw new NotSupportedException();
    }

    public override void Flush()
    {

    }

    public override int Read(byte[] buffer, int offset, int count)
    {
      throw new NotSupportedException();
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
      throw new NotSupportedException();
    }

    public override void SetLength(long value)
    {
      throw new NotSupportedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
      if (buffer == null)
      {
        throw new ArgumentNullException(nameof(buffer));
      }

      if (offset < 0)
      {
        throw new ArgumentOutOfRangeException("offset");
      }

      if (count < 0)
      {
        throw new ArgumentOutOfRangeException("count");
      }

      if (count == 0)
      {
        return;
      }

      if (offset + count > buffer.Length)
      {
        throw new ArgumentException("sum + offset is greater than buffer length.");
      }

      if (_uploadToAzureTask != null && _uploadToAzureTask.IsCompleted)
      {
        throw new InvalidOperationException("Unexpected error. Upload task finished prematurely.", _uploadToAzureTask.Exception);
      }



      var toProcessLength = count;
      var currentOffset = offset;
      while (toProcessLength > 0)
      {
        if (_lastWaitingToUploadStream == null)
        {
          _lastWaitingToUploadStream = _memoryStreamManager.GetStream();
        }

        var toWriteBytes = (int) Math.Min(_settings.MinBatchSizeInBytes - _lastWaitingToUploadStream.Length, toProcessLength);
        _lastWaitingToUploadStream.Write(buffer, currentOffset, toWriteBytes);
        toProcessLength -= toWriteBytes;
        currentOffset += toWriteBytes;

        if (_lastWaitingToUploadStream.Length == _settings.MinBatchSizeInBytes)
        {
          if (_uploadToAzureTask == null)
          {
            CreateChannel();
            _uploadToAzureTask = Task.Run(() => UploadData(_cts.Token));
          }

          _lastWaitingToUploadStream.Position = 0;
          var blockIndex = (_position + (count - toProcessLength)) / _settings.MinBatchSizeInBytes;
          Debug.Assert((_position + (count - toProcessLength)) % _settings.MinBatchSizeInBytes == 0);
          var streamBlockIdPair = new StreamBlockIdPair(_lastWaitingToUploadStream, createBlockId(blockIndex));
          var writeResult = _channelWriter.TryWrite(streamBlockIdPair);
          _lastWaitingToUploadStream = null;
          Debug.Assert(writeResult);
        }
      }

      _position += count;
      if (_position > _length)
      {
        _length = _position;
      }
    }

    protected override void Dispose(bool disposing)
    {
      if (disposing)
      {
        try
        {
          if (_uploadToAzureTask == null)
          {
            if (_lastWaitingToUploadStream != null && _lastWaitingToUploadStream.Length > 0)
            {
              _lastWaitingToUploadStream.Position = 0;
              _blobClient.Upload(_lastWaitingToUploadStream, overwrite: true);
              _lastWaitingToUploadStream = null;
            }
          }
          else
          {
            if (_lastWaitingToUploadStream != null && _lastWaitingToUploadStream.Length > 0)
            {
              _lastWaitingToUploadStream.Position = 0;
              var writeResult = _channelWriter.TryWrite(_lastWaitingToUploadStream);
              Debug.Assert(writeResult);
            }

            _channelWriter.TryComplete();
            _uploadToAzureTask.Wait();
          }
        }
        catch (Exception e)
        {
          Console.WriteLine(e);
        }
        finally
        {
          _lastWaitingToUploadStream?.Dispose();
        }
      }

      base.Dispose(disposing);
    }

    private async Task UploadData(CancellationToken ct)
    {
      var blockCounter = FIRST_BLOCK_INDEX;
      var blockIds = new List<string>();
      var tasksStreamDictionary = new Dictionary<Task, Stream>();
      var uploadCts = new CancellationTokenSource();
      var combinedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, uploadCts.Token);
      var combinedCtsToken = combinedCts.Token;

      try
      {
        while (await _channelReader.WaitToReadAsync(combinedCtsToken).ConfigureAwait(false))
        {

          while (_channelReader.TryRead(out var stream))
          {
            var blockIdString = createBlockId(blockCounter++);
            blockIds.Add(blockIdString);

            if (tasksStreamDictionary.Keys.Any(task => task.IsFaulted || task.IsCanceled))
            {
              var firstFailedTask = tasksStreamDictionary.Keys.First(task => task.IsFaulted || task.IsCanceled);
              await firstFailedTask.ConfigureAwait(false);
            }

            if (tasksStreamDictionary.Count >= _settings.MaxUploadTasks)
            {
              var completedTask = await Task.WhenAny(tasksStreamDictionary.Keys).ConfigureAwait(false);
              tasksStreamDictionary[completedTask].Dispose();
              tasksStreamDictionary.Remove(completedTask);
              await completedTask.ConfigureAwait(false);
            }

            combinedCtsToken.ThrowIfCancellationRequested();
            tasksStreamDictionary.Add(_blobBlockClient.StageBlockAsync(blockIdString, stream, cancellationToken: combinedCtsToken), stream);
          }
        }

        await Task.WhenAll(tasksStreamDictionary.Keys).ConfigureAwait(false);
        combinedCtsToken.ThrowIfCancellationRequested();

        if (!_channelReader.Completion.IsFaulted && blockIds.Count > 0)
        {
          await _blobBlockClient.CommitBlockListAsync(blockIds, cancellationToken: combinedCtsToken)
                           .ConfigureAwait(false);
        }
      }
      finally
      {
        uploadCts.Cancel();
        try
        {
          await Task.WhenAll(tasksStreamDictionary.Keys).ConfigureAwait(false);
        }
        catch(Exception exception)
        {
          Debug.WriteLine(exception);
        }

        foreach (var stream in tasksStreamDictionary.Values)
        {
          stream.Dispose();
        }

        tasksStreamDictionary.Clear();
      }
    }

    private static string createBlockId(int blockCounter)
    {
      var blockId = BitConverter.GetBytes(blockCounter);
      var blockIdString = Convert.ToBase64String(blockId, 0, blockId.Length);
      return blockIdString;
    }

    private readonly struct StreamBlockIdPair
    {
      public StreamBlockIdPair(Stream stream, string blockId)
      {
        Stream = stream;
        BlockId = blockId;
      }
      public Stream Stream
      {
        get;
      }

      public string BlockId
      {
        get;
      }
    }
  }
}
0 votes
by (5.3k points)
edited by

Part III

Part of the GetContent method in the Azure provider.

 protected override NodeContent GetContent(NodeBase node, NodeContentParameters contentParameters)
    {


      if (contentParameters.AccessType == NodeContentAccess.Write)
      {
        var blockBlobClient = GetContainerClientForNode(node.Parent).GetBlockBlobClient(node.Name);

        var blobWriteStream = new BlobStream(blobClient, blockBlobClient, _memoryStreamManager, new BlobWriteStreamSettings(1));

        return NodeContent.CreateImmediateWriteContent(blobWriteStream);
      }

      //Handle also read and read/write access.
}

Helper class BlobWriteSettings.

using System;

    namespace Rebex.SimpleAzureBlobProvider.Tools
    {
      public class BlobWriteStreamSettings
      {
        public BlobWriteStreamSettings(int maxUploadTasks = 1, int minUploadBatchSizeInBytes = 2 * 1014 * 1024)
        {
          if (maxUploadTasks <= 0)
          {
            throw new ArgumentOutOfRangeException(nameof(maxUploadTasks));
          }

          if (minUploadBatchSizeInBytes <= 0)
          {
            throw new ArgumentOutOfRangeException(nameof(minUploadBatchSizeInBytes));
          }

          MaxUploadTasks = maxUploadTasks;
          MinBatchSizeInBytes = minUploadBatchSizeInBytes;
        }

        public int MaxUploadTasks
        {
          get;
        }

        public int MinBatchSizeInBytes
        {
          get;
        }

        public static BlobWriteStreamSettings Default
        {
          get;
        } = new BlobWriteStreamSettings();
      }
    }
by (190 points)
thanks for your reply i'll try to implement this to my code
by (190 points)
hello again, is it possible to get the total length of the uploaded file from the client? I couldn't do that in getcontent method or custom stream class.
by (5.3k points)
Hello,
could you please describe the problem? If you need to know how many bytes were provided to the custom stream, you can add a number of written bytes to your variable.

 public override void Write(byte[] buffer, int offset, int count)
{
    Interlocked.Add(ref _myTotalBytes, count);
    // Do Write

}
And when the Stream is disposed, you have total uploaded bytes in variable _myTotalBytes.
Or do you need anything else?
by (190 points)
I'm trying to upload the file to my cloud account. I have to send the files in 1 megabyte chunks and upload api i use needs chunk count as a parameter. For this reason i have to know total length of stream so I can calculate the count of chunks before the write method
by (5.3k points)
In general, the SFTP server doesn't know the length of the uploaded file before the upload is completed. Your selected cloud provider requires the total file length for multipart upload? AFAIK Azure Blob storage doesn't need this, Amazon S3 doesn't require this.

https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html

" After successfully uploading all relevant parts of an upload, you call this CompleteMultipartUpload operation to complete the upload. Upon receiving this request, Amazon S3 concatenates all the parts in ascending order by part number to create a new object. In the CompleteMultipartUpload request, you must provide the parts list and ensure that the parts list is complete. The CompleteMultipartUpload API operation concatenates the parts that you provide in the list. For each part in the list, you must provide the PartNumber value and the ETag value that is returned after that part was uploaded."

You upload numbered chunks, and then you commit these chunks. Thats all.

If you really need to know the total length of the uploaded file and also you have to avoid local caching of the file, I am afraid that your requirements are in contradiction. You need special clients that provide information about the file length in a separate info channel.
by (190 points)
yes im sure my cloud provider is not one of the familiar ones. thanks for your help.
by (5.3k points)
Ahmet, you are welcome. It is not a problem if your cloud provider is not "the familiar" one. It is a problem if this cloud provider needs to know the length of the file in advance to support partial uploads. When it is this case, you most likely should change/abandon one of your requirements.
...