0 votes
by (1.9k points)

I am very confused about GetFileAsync when using SFtp.
If two tasks are executed and 'A' task is canceled and 'B' task is in a block state(writing is blocked for other io),
the 'A' task Wait() call is not returned until 'B' task is released.
I do not seem to understand the Async function.
In the example below, I would like you to help me understand what I do not understand.
(BeginGetFile, EndGetFile is same issue. EndGetFile is blocked)

class Program
{
    static void Main(string[] args)
    {
        Task task1 = null;
        Task task2 = null;

        Sftp sftp = Connect();
        if (sftp == null)
        {
            Console.ReadKey();
            return;
        }

        try
        {
            string filename = $"/home/011.jpg";
            string asyncState = filename;
            task1 = sftp.GetFileAsync(filename, new WaitStream(), 0, 512 * 1024, asyncState);
            Console.WriteLine($"{DateTime.Now.ToString()} Wait Task Created");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"{DateTime.Now.ToString()} Wait Task Created Error : {ex.Message}");
        }

        try
        {
            string filename = $"/home/019.jpg";
            string asyncState = filename;
            task2 = sftp.GetFileAsync(filename, new NoWaitStream(), 0, 512 * 1024, asyncState);
            Console.WriteLine($"{DateTime.Now.ToString()} NoWait Task Created");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"{DateTime.Now.ToString()} NoWait Task Created Error : {ex.Message}");
        }

        try
        {
            sftp.AbortTransfer(task2.AsyncState);
            Console.WriteLine($"{DateTime.Now.ToString()} NoWait Task Aborted");
            task2.Wait();
            Console.WriteLine($"{DateTime.Now.ToString()} NoWait Task Ended");
        }
        catch (Exception ex)
        {
            if (ex.InnerException != null)
                Console.WriteLine($"{DateTime.Now.ToString()} NoWait Task Wait Error : {ex.InnerException.Message}");
            else
                Console.WriteLine($"{DateTime.Now.ToString()} NoWait Task Wait Error : {ex.Message}");
        }

        try
        {
            sftp.AbortTransfer(task1.AsyncState);
            Console.WriteLine($"{DateTime.Now.ToString()} Wait Task Aborted");
            task1.Wait();
            Console.WriteLine($"{DateTime.Now.ToString()} Wait Task Ended");
        }
        catch (Exception ex)
        {
            if (ex.InnerException != null)
                Console.WriteLine($"{DateTime.Now.ToString()} Wait Task Wait Error : {ex.InnerException.Message}");
            else
                Console.WriteLine($"{DateTime.Now.ToString()} Wait Task Wait Error : {ex.Message}");
        }

        Console.WriteLine($"{DateTime.Now.ToString()} Finished");
        Console.ReadKey();
    }

    static Sftp Connect()
    {
        try
        {
            Sftp sftp = new Sftp();

            sftp.MaxDownloadSpeed = 0;
            sftp.MaxUploadSpeed = 0;
            sftp.Settings.UseLargeBuffers = true;
            sftp.Encoding = Encoding.UTF8;

            sftp.Connect("192.168.0.8", 22);

            sftp.Login("test", "test");

            return sftp;
        }
        catch(Exception ex)
        {
            Console.WriteLine($"Connect Failed : " + ex.Message);
            return null;
        }
    }

    private class NoWaitStream : Stream
    {
        public override bool CanRead
        {
            get { return false; }
        }

        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return true; }
        }

        public override void Flush()
        {
        }

        public override long Length
        {
            get { throw new NotSupportedException(); }
        }

        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
        }

        public override void Close()
        {
            base.Close();
        }
    }

    private class WaitStream : Stream
    {
        public override bool CanRead
        {
            get { return false; }
        }

        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return true; }
        }

        public override void Flush()
        {
        }

        public override long Length
        {
            get { throw new NotSupportedException(); }
        }

        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            Thread.Sleep(10000);
        }

        public override void Close()
        {
            base.Close();
        }
    }
}

In my case, GetDownloadStream / GetUploadStream works very well for my processing. However, performance is unsatisfactory (1 Gbps Wired Connection).
The GetDownloadStream / GetUploadStream function uses Ping / Pong Communication, so performance does not work properly.
I would like to ask if you can create GetDownloadStream / GetUploadStream using asynchronous communication like GetFile/PutFile.
For example :

Stream stream = sftp.GetBufferdDownloadStream();
...

// Cancel Transfer
stream.Close(); // don't send unsent acks

// Normal Close
stream.Flush(); // send all unsent acks
stream.Close();

Thank you.

Applies to: Rebex SFTP

2 Answers

0 votes
by (147k points)

Unlike PutFile / GetFile, stream-based methods such as GetDownloadStream / GetUploadStream / GetStream methods were not optimized for speed.

We plan to eventually address this limitation, but this is not yet in our near-term plan.

However, with a bit of custom code, it's already possible to utilize GetFile and PutFile to achieve what you need. Please check out the sample code that implements a GetUploadStream-like API using PutFile method. An equivalent approach could be used for GetFile as well.

0 votes
by (1.9k points)

EndGetFile of 'A' Task is blocked like the above example.
This may be a limitation of the Async function.
In another test,
Using single SFTP connection
There is no way for the AbortTransfer () function to cancel a specific GetFile when calling GetFile () on each thread.
I still can not solve the problem of cancellation of download / upload in case of single SFTP Connect.
I tried using SFTP Bind.
In this case, the AbortTransfer () function works because each SFTP Instance is used.
However, if I do a lot of Bind, I will get a Bind failure.
Is there a good way to cancel a transfer?

by (147k points)
Calling AbortTransfer does not end the operation immediately, but only as soon as it's done processing the current block of data. Setting MaxDownloadSpeed/MaxUploadSpeed to minimum allowed value and sleeping for 10 seconds in a Write method can cause this to take a very long time.
by (147k points)
It's indeed not possible to specify which transfer to cancel using AbortTransfer() method when the transfers have been initiated by calling the synchronous GetFile/PutFile method. However, the sample code linked above uses the Begin/End asynchronous variant.
by (147k points)
The Bind method is just an alternative to Connect/Login. This does not change the behavior of synchronous or asynchronous GetFile/PutFile or AbortTransfer at all.
by (1.9k points)
Thanks for the answer.
However, I still do not know how to do asynchronous processing on a Single Instance.
I will inform you after various tests.
by (1.9k points)
I found the cause of my program problem in Async functions.
The buffer in Rebex.IO.Pipe is a fixed size.
If the buffer is full, the write function is blocked.
If the buffer is empty, the read function is blocked.
In this blocked situation, functions such as task.Wait and EndGetFile are also blocked.
I think this is a normal operation of Async task.

I will try to figure out how to solve this problem. (I will consider temporary files.)

Suggestion:
If stream(Rebex.IO.Pipe) buffer is full, how about throwing a PauseTransfer Exception or a BufferFull (download) / BufferEmpty (Upload) Exception to pause transfer and call again next time?


for example:

enum Rebex.Net.Sftp.BufferState
{
    Normal = 0,
    Empty,
    Full
}

var task = sftp.GetFileAsync(filename, pipe.Input, start, length, asyncstate);

...

pipe.Output.Read(...);
sftp.ContinueAsync(asyncstate); --> continue transfer and call stream write function again

...


Rebex.IO.Pipe :

void Write(buffer, offset, length)
{
    if(buffer.Remaining < length)
        throw new Rebex.Net.BufferException(Rebex.Net.Sftp.BufferState.Full, callAgain : true); --> pause transfer and must call again with same parameters.
    ...

    write to internal buffer;

    if(buffer.Remaining < max write size)
        throw new Rebex.Net.BufferException(Rebex.Net.Sftp.BufferState.Full, callAgain : false); --> just pause transfer
    ...
}

Or supporting a new Stream object that can control the state of the buffer.

I'm really sorry.
Thank you.
by (147k points)
Thanks for pointing this out. In most scenarios, the supplying or receiving end of the Pipe is continuously supplying or receiving data, so the blockage does not occur. But this is not the case in all scenarios, as your code demonstrated.

However, I think there is another way to solve this issue that does not require any modifications to Rebex SFTP: Make the Pipe class abortable, and when aborting a PutFileAsync/GetFileAsync/BeginPutFile/BeginGetFile operation, cancel the Pipe as well. Aborting the Pipe would make the blocked Write or Read operation fail immediately, solving the issue.

This should be simple to implement by replacing the synchronous Read/Write calls inside the Pipe with ReadAsync/WriteAsync and Wait(), where the wait would also end when the Pipe has been canceled. Please let me know if this is not clear enough or if you have trouble implementing this.
by (1.9k points)
edited by
Thank you for your kind reply.
I am also fully aware of this Cancellation.
When I say exactly,
User requests arrive in serial. Think of it as File IO.

Multiple Open -> GetFileAsync
Multiple Read -> Read Stream
Multiple Close -> Cancel (or Done) and Wait Task.

Each IO request arrives sequentially (in any order).

(1) Call two GetFileAsync.
(2) read data from only one task.
(2) Cancels only one task from two tasks.
The other is waiting for IO.
(3) Wait the canceled task.

Open File A : GetFileAsync("A");
Open File B : GetFileAsync("B");
Read File A : streamA.Read();
// only read file A
...
Close File A : Pipe.Close(); taskA.Wait("A"); // blocked if "B" stream buffer is full
Read File B : streamB.Read();
Close File B : Pipe.Close(); taskA.Wait("A");


Pipe in the canceled task have been shut down (confirmed.).
But Task.Wait function does not return.

Because another task is in IO Wait, it is in an infinite wait state. This task should not be canceled.
(Thread monitor : second task's pipe is blocked because write buffer is full.)

What I want is that the Wait function of the canceled Task must be returned.
This is because the structure of Task is different from Thread.

I am avoiding this issue using sftp.Bind(new instance) and Thread(GetFile and AbortTransfer)

OnOpen()
{
    // if use Connect() -> cause user prompt (OTP).
    var sftp = main.BindNew(); // new sftp instance if bind(main.session) success or null if failed (too many bind).
    if(sftp != null) // if success, create thread and call get file
    {
        ...
        var pipe = new Pipe();
        ...
        var thread = new thread(()=>
        sftp.GetFile( pipe.Input )
        );
        thread.start;
        ...
    }
    else // can not create new instance : use direct stream.
    {
        stream = main.GetDownloadStream(); // download performance issue.
        ...
    }
}

OnClose(bool canceled)
{
    if(canceled)
    {
        if(sftp != main)
        {
            pipe.Input.Close(); // pipe stream
            pipe.Output.Close(); // pipe stream
            sftp.AbortTransfer();
            thread.Join();
        }
        else
        {
            stream.Close();
        }
    }
    else
    {
        if(sftp != main)
        {
            pipe.Input.Close(); // pipe stream
            pipe.Output.Close(); // pipe stream
            stream.Close(); // pipe stream
            thread.Join();
        }
        else
        {
            stream.Close();
        }
    }
}

Additionally,
The Begin / End function also encountered the same problem in the same test. (For the same reason)

A similar problem occurred when calling GetFile on a single instance using Thread. This problem is probably caused by the same reason or because AbortTransfer can not be called.
by (147k points)
It's quite difficult to debug code we can't see and can't run. I will try to implement the workaround with modified Pipe and post it here. Basically, what you need is an equivalent of GetUploadStream/GetDownloadStream that is as fast as GetFile/PutFile, right?
by (1.9k points)
Yes. I need  GetUploadStream/GetDownloadStream that is as fast as GetFile/PutFile.
For your debug, if you want my test codes. I will send project files to you.
by (147k points)
Please check out GetFastUploadStream/GetFastDownloadStream extension methods:
https://www.rebex.net/getfile/0a351b68156e42dd8a9fc885ae34c7ae/SftpExt.cs
by (1.9k points)
I can not download it. (404 Not Found)
by (147k points)
Sorry, it looks like our downloader has troubles with .cs files. Re-uploaded as ZIP:
https://www.rebex.net/getfile/a03b82055b514cacabe561d5cdef1e2d/SftpExt.zip
by (1.9k points)
I tested the source you provided.
This code also uses asynchronous functions and has the same blocking problem.
I suspect it's a problem for me to call an asynchronous function synchronously.
All of the test code below is called synchronously.
It will stop at tcs.Task.Wait ().
An asynchronous function is only likely to be used only when used globally asynchronously.
In my case, it seems preferable to use a synchronous function (GetDownloadStream) or MultiConnection/Thread/AbortTransfer Combination.
I do not want you to worry about this problem anymore.
We hope to develop faster synchronization functions than GetDownloadStream.
Thank you.


static void Test4()
{
    Sftp sftp = Connect();
    if (sftp == null)
    {
        Console.ReadKey();
        return;
    }

    int maxCount = 4;

    List<Stream> list = new List<Stream>();

    for (int i = 0; i < maxCount; i++)
    {
        try
        {
            string filename = $"/home/test2/{i + 1:D3}" + ".jpg"; // filesize : 1Mbytes
            Stream stream = Rebex.Net.SftpExt.GetFastDownloadStream(sftp, filename);
            list.Add(stream);
            Console.WriteLine($"Task {i} Created : " + filename);
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Task {i} Created Error : " + ex.Message);
        }
    }

    //byte[] buffer = new byte[65536];
    //int readn = 0;
    //Console.WriteLine($"Read 0 Start");
    //readn = list[0].Read(buffer, 0, 65536);
    //Console.WriteLine($"Read 0 Done : " + readn);

    Thread.Sleep(5000);

    for (int i = 0; i < maxCount; i++)
    {
        try
        {
            Console.WriteLine($"Task {i} Wait");
            Stream stream = list[i];
            stream.Close();
            Console.WriteLine($"Task {i} Ended");
        }
        catch (Exception ex)
        {
            if (ex.InnerException != null)
                Console.WriteLine($"Task {i} Wait Error : " + ex.InnerException.Message);
            else
                Console.WriteLine($"Task {i} Wait Error : " + ex.Message);
        }
    }

    Thread.Sleep(5000);

    sftp.Disconnect();
    sftp.Dispose();
    Console.WriteLine($"Finished");
    Console.ReadKey();
}
...