SFtp GetFileAsync and task.Wait() call is blocked.

0 votes
asked Nov 24 by lanopk (820 points)

I am very confused about GetFileAsync when using SFtp.
If two tasks are executed and 'A' task is canceled and 'B' task is in a block state(writing is blocked for other io),
the 'A' task Wait() call is not returned until 'B' task is released.
I do not seem to understand the Async function.
In the example below, I would like you to help me understand what I do not understand.
(BeginGetFile, EndGetFile is same issue. EndGetFile is blocked)

class Program
{
    static void Main(string[] args)
    {
        Task task1 = null;
        Task task2 = null;

        Sftp sftp = Connect();
        if (sftp == null)
        {
            Console.ReadKey();
            return;
        }

        try
        {
            string filename = $"/home/011.jpg";
            string asyncState = filename;
            task1 = sftp.GetFileAsync(filename, new WaitStream(), 0, 512 * 1024, asyncState);
            Console.WriteLine($"{DateTime.Now.ToString()} Wait Task Created");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"{DateTime.Now.ToString()} Wait Task Created Error : {ex.Message}");
        }

        try
        {
            string filename = $"/home/019.jpg";
            string asyncState = filename;
            task2 = sftp.GetFileAsync(filename, new NoWaitStream(), 0, 512 * 1024, asyncState);
            Console.WriteLine($"{DateTime.Now.ToString()} NoWait Task Created");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"{DateTime.Now.ToString()} NoWait Task Created Error : {ex.Message}");
        }

        try
        {
            sftp.AbortTransfer(task2.AsyncState);
            Console.WriteLine($"{DateTime.Now.ToString()} NoWait Task Aborted");
            task2.Wait();
            Console.WriteLine($"{DateTime.Now.ToString()} NoWait Task Ended");
        }
        catch (Exception ex)
        {
            if (ex.InnerException != null)
                Console.WriteLine($"{DateTime.Now.ToString()} NoWait Task Wait Error : {ex.InnerException.Message}");
            else
                Console.WriteLine($"{DateTime.Now.ToString()} NoWait Task Wait Error : {ex.Message}");
        }

        try
        {
            sftp.AbortTransfer(task1.AsyncState);
            Console.WriteLine($"{DateTime.Now.ToString()} Wait Task Aborted");
            task1.Wait();
            Console.WriteLine($"{DateTime.Now.ToString()} Wait Task Ended");
        }
        catch (Exception ex)
        {
            if (ex.InnerException != null)
                Console.WriteLine($"{DateTime.Now.ToString()} Wait Task Wait Error : {ex.InnerException.Message}");
            else
                Console.WriteLine($"{DateTime.Now.ToString()} Wait Task Wait Error : {ex.Message}");
        }

        Console.WriteLine($"{DateTime.Now.ToString()} Finished");
        Console.ReadKey();
    }

    static Sftp Connect()
    {
        try
        {
            Sftp sftp = new Sftp();

            sftp.MaxDownloadSpeed = 0;
            sftp.MaxUploadSpeed = 0;
            sftp.Settings.UseLargeBuffers = true;
            sftp.Encoding = Encoding.UTF8;

            sftp.Connect("192.168.0.8", 22);

            sftp.Login("test", "test");

            return sftp;
        }
        catch(Exception ex)
        {
            Console.WriteLine($"Connect Failed : " + ex.Message);
            return null;
        }
    }

    private class NoWaitStream : Stream
    {
        public override bool CanRead
        {
            get { return false; }
        }

        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return true; }
        }

        public override void Flush()
        {
        }

        public override long Length
        {
            get { throw new NotSupportedException(); }
        }

        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
        }

        public override void Close()
        {
            base.Close();
        }
    }

    private class WaitStream : Stream
    {
        public override bool CanRead
        {
            get { return false; }
        }

        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return true; }
        }

        public override void Flush()
        {
        }

        public override long Length
        {
            get { throw new NotSupportedException(); }
        }

        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            Thread.Sleep(10000);
        }

        public override void Close()
        {
            base.Close();
        }
    }
}

In my case, GetDownloadStream / GetUploadStream works very well for my processing. However, performance is unsatisfactory (1 Gbps Wired Connection).
The GetDownloadStream / GetUploadStream function uses Ping / Pong Communication, so performance does not work properly.
I would like to ask if you can create GetDownloadStream / GetUploadStream using asynchronous communication like GetFile/PutFile.
For example :

Stream stream = sftp.GetBufferdDownloadStream();
...

// Cancel Transfer
stream.Close(); // don't send unsent acks

// Normal Close
stream.Flush(); // send all unsent acks
stream.Close();

Thank you.

Applies to: Rebex SFTP

2 Answers

0 votes
answered Nov 26 by Lukas Pokorny (96,250 points)

Unlike PutFile / GetFile, stream-based methods such as GetDownloadStream / GetUploadStream / GetStream methods were not optimized for speed.

We plan to eventually address this limitation, but this is not yet in our near-term plan.

However, with a bit of custom code, it's already possible to utilize GetFile and PutFile to achieve what you need. Please check out the sample code that implements a GetUploadStream-like API using PutFile method. An equivalent approach could be used for GetFile as well.

0 votes
answered Nov 26 by lanopk (820 points)

EndGetFile of 'A' Task is blocked like the above example.
This may be a limitation of the Async function.
In another test,
Using single SFTP connection
There is no way for the AbortTransfer () function to cancel a specific GetFile when calling GetFile () on each thread.
I still can not solve the problem of cancellation of download / upload in case of single SFTP Connect.
I tried using SFTP Bind.
In this case, the AbortTransfer () function works because each SFTP Instance is used.
However, if I do a lot of Bind, I will get a Bind failure.
Is there a good way to cancel a transfer?

commented Dec 3 by lanopk (820 points)
Yes. I need  GetUploadStream/GetDownloadStream that is as fast as GetFile/PutFile.
For your debug, if you want my test codes. I will send project files to you.
commented 4 days ago by Lukas Pokorny (96,250 points)
Please check out GetFastUploadStream/GetFastDownloadStream extension methods:
https://www.rebex.net/getfile/0a351b68156e42dd8a9fc885ae34c7ae/SftpExt.cs
commented 4 days ago by lanopk (820 points)
I can not download it. (404 Not Found)
commented 4 days ago by Lukas Pokorny (96,250 points)
Sorry, it looks like our downloader has troubles with .cs files. Re-uploaded as ZIP:
https://www.rebex.net/getfile/a03b82055b514cacabe561d5cdef1e2d/SftpExt.zip
commented 4 days ago by lanopk (820 points)
I tested the source you provided.
This code also uses asynchronous functions and has the same blocking problem.
I suspect it's a problem for me to call an asynchronous function synchronously.
All of the test code below is called synchronously.
It will stop at tcs.Task.Wait ().
An asynchronous function is only likely to be used only when used globally asynchronously.
In my case, it seems preferable to use a synchronous function (GetDownloadStream) or MultiConnection/Thread/AbortTransfer Combination.
I do not want you to worry about this problem anymore.
We hope to develop faster synchronization functions than GetDownloadStream.
Thank you.


static void Test4()
{
    Sftp sftp = Connect();
    if (sftp == null)
    {
        Console.ReadKey();
        return;
    }

    int maxCount = 4;

    List<Stream> list = new List<Stream>();

    for (int i = 0; i < maxCount; i++)
    {
        try
        {
            string filename = $"/home/test2/{i + 1:D3}" + ".jpg"; // filesize : 1Mbytes
            Stream stream = Rebex.Net.SftpExt.GetFastDownloadStream(sftp, filename);
            list.Add(stream);
            Console.WriteLine($"Task {i} Created : " + filename);
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Task {i} Created Error : " + ex.Message);
        }
    }

    //byte[] buffer = new byte[65536];
    //int readn = 0;
    //Console.WriteLine($"Read 0 Start");
    //readn = list[0].Read(buffer, 0, 65536);
    //Console.WriteLine($"Read 0 Done : " + readn);

    Thread.Sleep(5000);

    for (int i = 0; i < maxCount; i++)
    {
        try
        {
            Console.WriteLine($"Task {i} Wait");
            Stream stream = list[i];
            stream.Close();
            Console.WriteLine($"Task {i} Ended");
        }
        catch (Exception ex)
        {
            if (ex.InnerException != null)
                Console.WriteLine($"Task {i} Wait Error : " + ex.InnerException.Message);
            else
                Console.WriteLine($"Task {i} Wait Error : " + ex.Message);
        }
    }

    Thread.Sleep(5000);

    sftp.Disconnect();
    sftp.Dispose();
    Console.WriteLine($"Finished");
    Console.ReadKey();
}
...