İlkay İlknur

.NET Core'da Pipe(System.IO.Pipelines) Kullanımı

Temmuz 22, 2020

Bu yazımızda .NET Core 2.1 ile beraber gelen System.IO.Pipelines namespace'i altına bulunan Pipe yapısını inceleyeceğiz. Pipe'lar kısaca .NET içerisinde performanslı IO yapmamızı sağlayan bir yapı. Geleneksel yöntemlerle streaming datasını parse eden uygulamalarda daha az allocationa neden olan ve performanslı çalışan bir kod yazmamız gerektiğinde pek çok farklı implementasyon yapmamız gerekirken Pipe yapısı bu implementasyonların hepsini built-in olarak içerisinde barındırması itibariyle bize güzel ve kullanımı kolay bir altyapı sağlıyor.

Şimdi gelin ilk olarak Pipeline'lar olmadan önce nasıl kod yazıyorduk onu inceleyelim. Bir network streamden satır satır data okuyup işleyen şu kodu düşünelim.

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);
 
    // Process a single line from the buffer
    ProcessLine(buffer);
}

Bu koddaki sorunlara baktığımızda şunlar karşımıza çıkıyor.

  • Read operasyonundan dönen data içerisinde birden fazla satır data olabilir.
  • Read operasyonundan dönen datada bir satır sonu bulunmayabilir ve satır sonu bulunana kadar okumaya devam etmek gerekebilir.

İlk maddedeki sorunu çözmek istediğimizde aşağıdaki gibi bir implementasyon yapabiliriz.

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    var bytesBuffered = 0;
    var bytesConsumed = 0;
 
    while (true)
    {
        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }
        // Keep track of the amount of buffered bytes
        bytesBuffered += bytesRead;
 
        var linePosition = -1;
 
        do
        {
            // Look for a EOL in the buffered data
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed);
 
            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset
                var lineLength = linePosition - bytesConsumed;
 
                // Process the line
                ProcessLine(buffer, bytesConsumed, lineLength);
 
                // Move the bytesConsumed to skip past the line we consumed (including \n)
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

Read operasyonu sonucunda birden fazla satır gelmesi sorununu çözdük. Şimdi gelelim diğer soruna. Bir satır eğer 1024 bytetan büyük ise satır sonuna kadar okumaya devam edip elimizdeki bufferı resize etmemiz gerekiyor.

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;
 
    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer
        var bytesRemaining = buffer.Length - bytesBuffered;
 
        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }
 
        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }
 
        // Keep track of the amount of buffered bytes
        bytesBuffered += bytesRead;
 
        do
        {
            // Look for a EOL in the buffered data
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed);
 
            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset
                var lineLength = linePosition - bytesConsumed;
 
                // Process the line
                ProcessLine(buffer, bytesConsumed, lineLength);
 
                // Move the bytesConsumed to skip past the line we consumed (including \n)
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

Bu implementasyona baktığımızda da oldukça fazla buffer kopyalaması yaptığımızı görüyoruz. Bunun da çözümü arkada bir buffer listesi tutup kopyalama işlemlerini ve memory kullanımını azaltmak. Yani özetle baktığımızda etkin, performanslı, az allocationa neden olan bir kod yazmaya kalktığımızda pek çok şeyi düşünüp implemente edip, doğru çalıştığından emin olmamız gerekiyor. Bu da oldukça zor. Şimdi gelelim pipeline'lar bu konuyu nasıl çözüyorlar ona bakalım.

System.IO.Pipelines

Yukarıda yaptığımız implementasyona nazaran Pipelineların en büyük avantajı buffer yönetimini arka planda etkin bir biçimde kendisinin yapması. Bu da bize hem performanslı hem de daha az memory tüketen bir altyapı sağlıyor. Aynı zamanda kod yazarken performans ve etkinlik gibi konulara odaklanmak yerine business tarafına daha çok odaklanmamızı sağlamakta.

Pipe yapısını kullanabilmemiz için öncelikle projemize System.IO.Pipelines nuget paketini yüklememiz gerekiyor. Paketi yükledikten sonra kolayca aşağıdaki gibi Pipe yaratabiliyoruz.

var pipe = new Pipe();

Pipe yarattıktan içerisinde Writer ve Reader isimli iki property görüyoruz. PipeWriter bizim pipe'a data yazmamızı sağlarken PipeReader ise pipe'tan data okumamızı sağlayan kısım. Önce pipe'a veri yazma kısmına bakalım.

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    while (true)
    {
        Memory<byte> memory = writer.GetMemory();
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }
 
        FlushResult result = await writer.FlushAsync();
 
        if (result.IsCompleted)
        {
            break;
        }
    }
 
    writer.Complete();
}

Buradaki koda baktığımızda ilk olarak PipeWriterdan bir buffer istiyoruz. Buradaki buffer array pooldan geliyor. Sonrasında socket üzerinden ReceiveAsync metodunu çağırarak verinin buffera yazılmasını sağlıyoruz. Burada buffer Memory<T> tipinde olduğu için buffer yaratma veya kopyalama gibi sorunlarla uğraşmıyoruz. Advance metodunu kullanarak writera ne kadar data yazıldığını söylüyoruz. Böylece arkadaki bufferların etkin bir şekilde yönetimini pipe'a bırakıyoruz. Sonrasında da FlushAsync metoduyla yazdığımız verinin Reader tarafından erişilebilir olmasını sağlıyoruz.

Şimdi de Reader tarafına bakalım.

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
 
        ReadOnlySequence<byte> buffer = result.Buffer;
        SequencePosition? position = null;
 
        do
        {
            position = buffer.PositionOf((byte)'\n');
 
            if (position != null)
            {
                ProcessLine(buffer.Slice(0, position.Value));
 
                buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
            }
        }
        while (position != null);
 
        reader.AdvanceTo(buffer.Start, buffer.End);
 
        if (result.IsCompleted)
        {
            break;
        }
    }
 
    reader.Complete();
}

Okuma kısmına gelince ReadAsync metodundan dönen ReadResult içindeki bufferın tipinin ReadOnlySequence olduğunu görüyoruz. ReadOnlySequence tipi bufferları linked list olarak tutan yapı. Daha da açık anlatmak gerekirse Memory<T> instancelarını birbirine bağlayan bir struct. Sonrasında bu liste üzerinde satır sonunun pozisyonunu buluyoruz. Bulduktan sonra da Slice metoduyla ilgili memory alanını ProcessLine metoduna parametre geçiyoruz. Span ve Memory tiplerinden hatırlayacağımız üzere Slice operasyonunda herhangi bir allocation vs.. olmadığı için bu işlem oldukça hızlı ve etkin olarak yapılıyor. AdvanceTo metoduylada pipe üzerinden ne kadar data consume ettiğimizi bildiriyoruz. Burada dikkat etmemiz gereken nokta da AdvanceTo metodunu çağırdıktan sonra önceki Read operasyonuyla gelen bufferı kullanmamak. Bu noktada o buffer artık tekrardan Pipe'ın yönetimine giriyor. Artık yapmamız gereken tekrardan ReadAsync metodunu kullanarak Pipe'tan yeni datayı okumak.

Pipeların ASP.NET Core İçerisinde Kullanımı

Pipeların ilk olarak ortaya çıktığı yer Kestrel web serverı. Kestrel ekibi web serverın performansını arttırmak için Pipe implementasyonunu yapıyor ve sonrasında da bu implementasyon olgunlaştıktan sonra frameworkün içerisine ekleniyor.

Şu anda ASP.NET Core içerisinde pek çok noktada pipelar streamlerin yerine kullanılıyor. Request bodysini okumak istediğimizde Request.Body stream dönerken Request.BodyReader PipeReader dönmekte. Aynı şekilde Response.Body yine stream dönerken Response.BodyWriter PipeWriter dönüyor. Bu nedenle daha performanslı ve etkin implementasyonlar için Pipe versiyonlarını kullanmamızda fayda var.

Kaynak kodlar vs.. gibi konularda buradaki makaleden faydalandım. İlgilenmek isteyen linke tıklayıp daha detaylı bilgi alabilirler.

Bir sonraki yazıda görüşmek üzere,