İlkay İlknur

.NET Core'da Channels Kullanımı

Haziran 25, 2020

Herkese Selamlar,

Bu yazıda System.Threading namespace'i altına bulunan Channel<T> tipini inceleyeceğiz. .NET Core 2.1 ile beraber gelen bu tip, en basit anlatımla uygulamalarımız içerisinde bir noktadan bir başka noktaya veri gönderebilmemizi sağlanmakta. Bir diğer anlatımla da uygulamalarımızda producer/consumer implementasyonları gerçekleştirirken kullanabileceğimiz bir veri yapısı olarak da düşünebiliriz.

Channel yaratabilmek için Channel tipi içerisinde iki adet statik metot bulunmakta.

  • Channel.CreateBounded<T>(int capacity) : Bu metot ile parametrede belirtilen eleman sayısı kadar eleman taşıyabilecek bir channel yaratılır.
  • Channel.CreateUnbounded<T>(): Bu metot ile de sınırsız sayıda eleman taşıyabilecek bir channel yaratılır.

Her iki metotta da generic parametre olarak channel içerisinde taşıyacağınız data tipini belirtmeniz gerekiyor.

Yukarıdaki metotları kullanarak bir channel yarattıktan sonra channel içerisindeki Reader ve Writer propertyleri içerisindeki metotları kullanarak ilgili işlemleri yapabiliyoruz. Şimdi isterseniz basit bir channel yaratıp kullanımını inceleyelim.

static void Main(string[] args)
{
    var channel = Channel.CreateUnbounded<int>();
 
    //Producer
    _ = Task.Run(async () =>
    {
        int number = 1;
        while (true)
        {
            await channel.Writer.WriteAsync(number);
            number++;
            await Task.Delay(1000);
        }
    });
 
    //Consumer
    _ = Task.Run(async () =>
    {
        while (true)
        {
            var data = await channel.Reader.ReadAsync();
            Console.WriteLine($"Data:{data}");
            await Task.Delay(3000);
        }
    });
    Console.ReadKey();
}

Yukarıda channelı yaratıp sonrasında ilk olarak producer kısmı yazdık. Burada Writer içerisindeki WriteAsync metodunu kullanarak 1.sn aralıklarla datayı channela gönderdik. Sonrasında ise consumer tarafta Reader propertysi içerisindeki ReadAsync metoduyla channeldaki datayı 3'er saniye aralıklarla okuyup console'a yazdırdık.

Kapasite limiti olmayan channelların basitçe kullanımı bu şekilde. Ancak kapasite limiti olan channelların(bounded) kullanımında bazı senaryolar var. Bu senaryolarda etkilenen taraf channela data yazan kısım. Şimdi gelin bu tip durumlarda writer metotların davranışlarını inceleyelim.

  • WriteAsync : Eğer channel içerisindeki eleman sayısı belirtilen kapasite limitine ulaşmadıysa channela datayı yazarak çağıran tarafa geri döner. Ancak eleman sayısı kapasite limitine ulaştıysa bu metot channeldan data okunana kadar bekler ve çağıran tarafa geri dönmez.
var channel = Channel.CreateBounded<int>(1);
await channel.Writer.WriteAsync(1);
//Yeni bir read gelene kadar bekler.
await channel.Writer.WriteAsync(2);
  • TryWrite: Bu metot senkron olarak çalıştığı için eğer channelda yeterli yer varsa yazıp true değeri döner. Yer yoksa doğrudan false değeri döner.

  • WaitToWriteAsync : Bu metot channela herhangi bir değer yazmaz ancak channelda yazmak için yeterli yer olduğunda sizi haberdar eder. Özellikle while loop içerisinde condition olarak bu metot kullanılabilir.

var channel = Channel.CreateBounded<int>(1);
while (await channel.Writer.WaitToWriteAsync())
{
    await channel.Writer.WriteAsync(1);
}

Default olarak bounded channel kullanımında writer tarafta olan değişimler bu şekilde. Ancak bounded channel yaratırken channel içerisinde kapasite dolduğunda channelın davranışını değiştirme imkanımız da var. Bunun için channel yaratırken parametre geçebildiğimiz BoundedChannelOptions içindeki FullMode propertysini kullanabiliyoruz. Bu propertynin alabildiği değerler şunlar.

  • Wait(default): Eğer channel içerisinde kapasiteye ulaşılırsa channeldan data okunana kadar write operasyonu bekler.
  • DropNewest : Channelda kapasite dolduğunda en son eklenen eleman silinerek channelda yer açılır.
  • DropOldest : Channelda kapasite dolduğunda en eski eklenen eleman silinerek channelda yer açılır.
  • DropWrite : Channelda kapasite dolduğunda yapılan write operasyonları yoksayılır.

Küçük Optimizasyonlar

Eğer yarattığınız channelların tek bir readerı veya writerı olacaksa bunları channel yaratırken belirtirseniz arka planda daha etkin ve verimli bir implementasyon kullanılmasını sağlayabilirsiniz.

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
    SingleReader=true,
    SingleWriter = true,
});
 
var channel2 = Channel.CreateUnbounded<int>(new UnboundedChannelOptions()
{
    SingleReader = true,
    SingleWriter = true,
});

Channelların C# 8.0 Async Enumerables İle Kullanımı

Channeldan data okurken C# 8.0 ile beraber gelen async enumerables özelliğini kullanabiliyoruz.

Örneğin,

var channel = Channel.CreateUnbounded<int>();
//Producer
_ = Task.Run(async () =>
{
    int number = 1;
    while (true)
    {
        await channel.Writer.WriteAsync(number);
        number++;
        await Task.Delay(1000);
    }
});
 
await foreach (var item in channel.Reader.ReadAllAsync())
{
    Console.WriteLine(item);
}

ReadAllAsync metodu IAsyncEnumerable donüyor ve channelda data olduğu sürece foreach döngüsüyle okuyabiliyoruz. Eğer channelda data yoksa yenisi gelene kadar bekliyoruz ve foreach döngüsü sonlanmıyor. Ancak writer üzerinden Complete metodu çağırılırsa channela daha fazla eleman eklenmeyeceği mesajı verilir ve bundan sonra foreach metodu channeldaki tüm elemanları işledikten sonra sonlanır.

Örneğin,

var channel = Channel.CreateUnbounded<int>();
//Producer
_ = Task.Run(async () =>
{
    int number = 1;
    while (true)
    {
        await channel.Writer.WriteAsync(number);
        number++;
        if (number == 10)
        {
            channel.Writer.Complete();
        }
        await Task.Delay(1000);
    }
});
 
await foreach (var item in channel.Reader.ReadAllAsync())
{
    Console.WriteLine(item);
}
 
Console.WriteLine("Done");
Console.ReadKey();

Channellar .NET Core ve ASP.NET Core içerisinde de belirli yerlerde kullanılmakta. Siz de örneğin asp.net core içerisinde background servislere data gönderme gibi alanlarda bu tipten yararlanabilirsiniz.

Bir sonraki yazıda görüşmek üzere,