0 votes
by (170 points)

We'd like to use Rebex.FileServer to add virtual file server capability to our product. The idea is that our product behaves like a (write-only) SFTP server, but instead of writing the data to disk, the data should get streamed over the network with some other protocol.
The files can be quite big, so one requirement is that our solution does not first read the entire file into memory, and then sends it to the destination. Instead, when the client starts uploading, the data should be streamed through our product to the destination.

We already implemented the above for FTP with FtpServer and a virtual file system. With that implementation, when the client creates a file, the FTP library will call our application and we get a stream where we can read from.

Now we tried the same with Rebex.FileServer and subclassing ReadWriteFileSystemProvider. In this case, when the client creates a file, we get a call to GetContent. But in this case we don't get a stream to read from, but we need to provide a stream. There is also SaveContent method to overwrite where we actually get a stream, but only after we provided a stream in GetContent - and the weird thing is, it does not seem to be the same stream.

Our current implementation looks like this:

protected override NodeContent GetContent(NodeBase node, NodeContentParameters contentParameters)
{
    MyOwnCode.ThrowIfNotValidFileNode(node);
    return NodeContent.CreateDelayedWriteContent(new MemoryStream());
}

protected override NodeBase SaveContent(NodeBase node, NodeContent content)
{
    MyOwnCode.ProvideStreamSoThatDataSinkCanStartReading(content.GetStream());
    MyOwnCode.BlockUntilDataSinkHasReadTheEntireStream();
    return node;
}

This seems to work, but there are a couple of things which are unclear:

  1. Why do we need to pass in a stream in GetContent, when a file is being written? Why don't we just get a call to SaveContent where we get access to a stream to read from?
  2. What happens to the stream we provide in GetContent? It looks like the stream we get in SaveContent is not the same stream that we passed in. So how is the stream we provide in GetContent used?
  3. Is it our responsibility to dispose the stream we provide in GetContent?
  4. Does the above implementation fulfil the requirement of not caching the entire file in memory before providing it to the data sink?

It is all a bit confusing. It would make much more sense if GetContent was called if (and only if) a client reads a file (so we provide a stream to read from); and SaveContent was called if (and only if) a client writes a file (and we get a stream to read from).

Applies to: File Server

2 Answers

+1 vote
by (5.3k points)

Welcome to the Rebex forum, Robert. Thanks for the question.

If i understand correctly:
1) You need to stream uploaded file over other network protocol.
and
2) Avoid storing file to the local file system.
and
2) Avoid storing full file content in the memory.

To fulfill requirements, you should use "immediate write stream".

E.g. assume that you need to stream an uploaded file to the Azure.

protected override NodeContent GetContent(NodeBase node, NodeContentParameters contentParams)
{
            if (contentParams.AccessType == NodeContentAccess.Read)
            {
                //You have write only file system, throw;
            }

            var cloudBlobStream = getContainerReference(node.Parent.Path.FileName)
                                 .GetBlockBlobReference(node.Name).OpenWrite();

            //CloudBlobStream immediately uploads data when its Write method is called.
            return NodeContent.CreateImmediateWriteContent(cloudBlobStream); /
}

I don't know how exactly this code works.

 MyOwnCode.ProvideStreamSoThatDataSinkCanStartReading(content.GetStream());

The solution might be quite simple. Provide your own stream and upload the data.

 class MySpecialStream : Stream
 {
 ...
 public override void Write(byte[] buffer, int offset, int count) => _myDataSink.Write(buffer, offset, count);

 }

In this scenario you can safely ignore SaveContent method (see below).

Now, I will try to answer your questions.

1) GetContent method has NodeContentParameters argument that contains AccessType required by the file server operation (Read/Write or both).

To put it another way:
Analogy :
Imagine that if you call a hypothetical (Virtual)File.Open(....requiredAccess..) method, then GetContent method (...requiredAccess...) is the method that provides the result with required access.

SaveContent Method is kind of fall-back method. As documented, SaveContent method is called only when you return stream using the CreateDelayedWriteContent call in the GetContent method. Word 'Delayed' in the name of the method roughly means "I don't have stream that can save content immediately (e. g. FileStream), I am using some other stream and I promise (as an author of the virtual file system) that I will save content of this stream later when the SaveContent method is called."

2) a) GetContent method returns delayed write stream (in your case MemoryStream).
b) Provided stream is wrapped in a decorator to ensure that method SaveContent is called before the dispose of the stream is allowed.
c) File server closes (disposes) stream and method SaveContent is called with our decorator.

3) It follows from point 2 that method Dispose is called by the file server. You don't have to call Dispose method. If you don't like this behavior, you can wrap your stream in an own decorator that will have empty Dispose method.

4) No. I hope that from explanation above is clear that you are caching file in memory. You are using:
a) DelayedWriteContent method.
b) MemoryStream.

As I wrote above, please use CreateImmediateWriteContent method and suitable stream.

I am sorry for the confusion. We are aware that this part of our virtual file system is probably most complicated and unintuitive part. The reason is that we must support many different scenarios in many contexts. Write-only and pass-through virtual file system is one niche scenario. We tried to find common denominator, but I would like to point out that your feedback is very valuable for us.

I hope that this helps. Don't hesitate contact us again.

by (170 points)
Hi renestein, thanks a lot for your very detailed answer! Getting such great support makes us feel confident that it is the right decision to go with your product and buy the license!
The main difference between your Azure example and our code is that with Azure you seem to get a stream from the sink where your library can write to. But in our case we need to provide a stream to our sink where it can read from (push vs. pull).
I think your idea with that custom stream will solve my problem. I will try to implement that tomorrow with a fresh mind.
Thanks again!
by (5.3k points)
You are welcome, Robert. Thanks for your kind words.
0 votes
by (170 points)

Here is what I came up with, based on the inspiration I got from renestein. Writing the custom stream was trickier than expected (and I'm still not 100% sure if all the locking stuff is correct), but first tests look promising.

So I'm no longer using SaveContent, and my new GetContent looks something like this:

protected override NodeContent GetContent(NodeBase node, NodeContentParameters contentParameters)
{
    if (contentParameters.AccessType == NodeContentAccess.Read)
    {
        return NodeContent.CreateReadOnlyContent(new MemoryStream());
    }
    else
    {
        var stream = new MyFileContentStream();
        MyOwnCode.ProvideStreamSoThatDataSinkCanStartReading(stream);
        return NodeContent.CreateImmediateWriteContent(stream);
    }
}

And this is the new stream class which allows reading from my data sink and writing by your library at the same time:

internal class MyFileContentStream : Stream
{

    private byte[] CurrentBuffer = null;
    private int ReadPointer = 0;

    private readonly AutoResetEvent NotifyBufferEmpty = new AutoResetEvent(true);
    private readonly ManualResetEvent NotifyDataAvailable = new ManualResetEvent(false);

    public override void Write(byte[] buffer, int offset, int count)
    {
        // block until local buffer is empty
        NotifyBufferEmpty.WaitOne();

        // copy bytes
        CurrentBuffer = new byte[count];
        ReadPointer = 0;
        Buffer.BlockCopy(buffer, offset, CurrentBuffer, 0, count);

        // notify new data is available
        NotifyDataAvailable.Set();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        // block until data available
        NotifyDataAvailable.WaitOne();

        // notify end of stream when no more buffer available
        if (CurrentBuffer == null)
            return 0;

        // copy bytes
        var availableBytes = CurrentBuffer.Length - ReadPointer;
        var readBytes = Math.Min(count, availableBytes);
        Buffer.BlockCopy(CurrentBuffer, ReadPointer, buffer, offset, readBytes);
        ReadPointer += readBytes;

        // check if we have copied all bytes of the current buffer
        if (CurrentBuffer.Length == ReadPointer)
        {
            CurrentBuffer = null;

            // make sure we can no longer read
            NotifyDataAvailable.Reset();

            // notify buffer empty
            NotifyBufferEmpty.Set();
        }

        return readBytes;
    }

    public override void Flush()
    {
        NotifyBufferEmpty.WaitOne();
        CurrentBuffer = null;
        ReadPointer = 0;
        NotifyBufferEmpty.Set();
        NotifyDataAvailable.Set();
    }

    protected override void Dispose(bool disposing)
    {
        Flush();
        base.Dispose(disposing);
    }

    public override bool CanRead => true;

    public override bool CanSeek => false;

    public override bool CanWrite => true;

    public override long Length => 0;

    public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }

    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();

    public override void SetLength(long value) => throw new NotSupportedException();

}
by (5.3k points)
edited by
Hello Robert, thanks for sharing.
Virtual file system part looks OK. I am little worrying about the thread synchronization. Is it necessary to use low level thread synchronization constructs such as ManualResetEvent? It is easy to miss the signal or introduce e.g. deadlock/convoy locks later. I don´t know your target environment, but I think that a variant of the TransferStream can be handy.
https://code.msdn.microsoft.com/Samples-for-Parallel-b4b76364/sourcecode?fileId=44488&pathId=1643499526
...