İçeriğe geç →

C# ile RabbitMQ Client’ı kullanarak Publisher ve Consumer Yapısı

Merhaba arkadaşlar.

icons--rabbitmq-200x200

Bir önceki makalem olan “RabbitMQ Nedir ve Windows’a Kurulumu” isimli makale ile, RabbitMQ konusuna bir giriş yapmış idik. Bu makale kapsamında ise RabbitMQ’nun C# provider’ını kullanarak en sık kullanılan Messaging Pattern‘lerinden birisi olan “Publish / Subscribe” pattern’i mantığında, basit bir şekilde nasıl mesaj gönderilir ve alınıra bakıyor olacağız.

RabbitMQ’nun Procuder, Queue ve Consumer yapısını basit olarak hatırlamak gerekirse;

ProducerQueueConsumer

  • Producer: Queue’ya mesaj gönderen uygulamadır. Yani Publisher’ımız.
  • Consumer: Queue’daki mesajları dinleyecek olan uygulamamızdır.

Genel bilgileri tekrardan hatırladığımıza göre şimdi kodlamaya başlayabiliriz. Örnek olarak bir Console Application oluşturacağız ve Queue’ya mesaj gönderecek olan Publisher’ı ve bu mesajları dinleyecek olan Consumer yapısını oluşturacağız.

Öncelikle Nuget Package Manager üzerinden aşağıda görmüş olduğumuz “RabbitMQ.Client” paketini kuralım.

rabbitmq-nuget

Kurmuş olduğumuz bu client paketi sayesinde RabbitMQ ile haberleşebileceğiz.

RabbitMQ’nun C# Client provider’ı connection işlemlerini “ConnectionFactory” üzerinden yaratarak gerçekleştirmektedir. Bu connection’a hem “Publisher” hemde “Consumer” içerisinde aynı şekilde ihtiyacımız olacağı için, “RabbitMQService” isminde bir class ekleyelim ve burada “ConnectionFactory” class’ını kullanarak ortak işlemleri kodlamaya başlayalım.

RabbitMQService class’ımız hazır durumda. Connection işlemleri “IConnection” interface’inden türemektedir ve factory class’ı üzerinden “HostName” property’sini set ederek, “CreateConnection()” method’u aracılığı ile yeni bir connection oluşturulabilmektedir.

Şimdi “Publisher” görevini görecek olan Publisher class’ını kodlamaya geçelim.

Şimdi neler yaptığımıza adım adım bir bakalım.

Yukarıdaki satır ile açmış olduğumuz connection üzerinden “CreateModel” method’unu çağırarak, RabbitMQ üzerinde yeni bir channel/session yaratmaktayız. Bu Channel sayesinde bir Queue oluşturabilirken, mesaj gönderme işlemlerini de gerçekleştirebilmekteyiz.

Bu satırda ise method isminden de anlaşılabileceği üzere, yeni bir queue tanımlıyoruz. Burada önemli olan ilk üç parametresine bakmak gerekirse;

  • queue: Oluşturulacak olan Queue’nun ismi.
  • durable: Bu parametre ile in-memory olarak çalışan Queue disk üzerinden çalışmaya başlayacaktır. Bu sayede RabbitMQ servisi dursa bile Queue kaybolmayacaktır. Her güzelliğin getirdiği bir kötü tarafın olduğu gibi bununda beraberinde getireceği latency problemi bulunmaktadır haliyle.

Artık Channel ve Queue hazır bir durumda olduğu için, “BasicPublish” method’u ile kolay bir şekilde oluşturmuş olduğumuz ilgili Queue’ya mesaj gönderiyoruz.

“BasicPublish” method’un kullandığımız parametrelerine bakarsak:

  • exchange: Bu parametreyi es geçiyoruz. Exchange genel olarak mesajı ilgili Routing Key’e göre ilgili Queue’ya yönlendiren bölümdür. Direct Exchange, Fanout Exchange ve Topic Exchange gibi tipleri bulunmaktadır. Bunları bir sonraki makalemde detaylı olarak ele alacağım.
  • routingKey: Burada girmiş olduğumuz key’e göre ilgili Queue’ya yönlendirilecektir mesaj.
  • body: Queue’ya göndermek istediğimiz mesajı byte[] tipinde gönderiyoruz.

Publisher’ı test edebilmek için “Program.cs” class’ını açalım ve aşağıdaki gibi kodlayalım.

Queue ismini “GOKHANGOKALP” olarak belirleyelim ve constructor üzerinden Queue ismini ve göndermek istediğimiz mesajı set edelim. Uygulamayı çalıştırmadan önce RabbitMQ Management ekranı üzerinden Queues tab’ına bir bakalım. Management ekranına “localhost:15672” adresinden erişebilirsiniz.

rabbitmq-management-queue

Yukarıda gördüğümüz gibi şuan herhangi bir Queue bulunmamaktadır. Şimdi oluşturmuş olduğumuz Console uygulamasını çalıştıralım.

rabbitmq-publisher-console

Publisher Console uygulaması üzerinden başarılı bir şekilde çalıştı ve “GOKHANGOKALP” isminde bir Queue oluşturarak “Hello RabbitMQ World!” mesajını Queue’ya ekledi. Management ekranını bir de şimdi kontrol edelim.

rabbitmq-publisher-queues

Tada… “GOKHANGOKALP” isminde bir Queue oluşmuş ve “Read” state’inde bir mesajın bulunduğunu söylüyor. Şimdi sıra geldi bu mesajları alacak yani Subscribe edecek olan Consumer’ı kodlamaya. “Consumer” isminde yeni bir class oluşturarak, aşağıdaki gibi kodlamaya başlayalım.

Consumer kısmında adım adım ne yaptığımızı inceleyelim şimdi.

Asıl işlemlerimizi gerçekleştirecek oldıuğumuz “consumer” nesnemizdir. “EventingBasicConsumer” class’ı sayesinde constructor üzerinden channel verildiğinde, “Recevied” event’i sayesinde sürekli “listening” modunda olacaktır. Queue’daki ilgili mesajları sırasıyla almaktadır ve “Body” property’sinde barındırmaktadır. Hatırlarsak mesajları “byte[]” tipinde göndermiştik. Göndermiş olduğumuz mesajın “string” tipinde olduğunu bildiğimiz için “Encoding” altında bulunan “GetString” method’u ile “Decode” işlemini gerçekleştiriyoruz.

NOT: RabbitMQ Queue’su FIFO(First in Frist out) mantığında çalışmaktadır.

Burada ise method isminden yine anlaşılabileceği üzere (isimlendirmenin ne kadar önemli olduğu buradan da anlaşılabilir) basic bir şekilde verilmiş olan Queue ismine göre mesajları alma işlemini başlatıyoruz. Burada dikkat çekmek istediğim bir kaç parametre bulunmaktadır:

  • queue: Hangi Queue’nun mesajları alınacak ise.
  • noAck: True olarak set edildiği taktirde, consumer mesajı aldığı zaman otomatik olarak mesaj Queue’dan silinecektir. Eğer Queue üzerinden silinmesini istemiyor iseniz, False olarak set etmeniz gerekmektedir.

Console uygulamasının “Program.cs” class’ını şimdi aşağıdaki gibi güncelleyelim ve tekrardan çalıştıralm.

Publisher’ı aynı şekilde başlatarak tek fark olarak Consumer’ı tanımlıyor ve aynı “_queueName” parametresini geçerek, aynı Queue üzerindeki mesajları dinlemesini söylüyoruz.

Bunun sonucunda Console uygulamasının çıktısı aşağıdaki gibi olacaktır.

rabbitmq-publisher-and-consumer

Gördüğümüz gibi sonuç olarak Consumer başarılı bir şekilde “GOKHANGOKALP” Queue’sunu dinleyerek, içerisinde bulunan mesajı çekip ekrana basmıştır. “noAck” parametresini True olarak set etmiş idik. Şimdi Management ekranını bir kontrol edelim bakalım Queue üzerinde herhangi bir mesaj var mı?

rabbitmq-management-queue2

Gördüğümüz gibi “GOKHANGOKALP” Queue’su olduğu gibi durmaktadır fakat herhangi bir mesaj içerisinde bulunmamaktadır. Şimdi “noAck” parametresini False olarak set edip, tekrardan Console uygulamasını çalıştıralım ve sonuca tekrardan Management ekranı üzerinden bir bakalım.

rabbitmq-management-queue3

“noAck” parametre değeri False olduğundan dolayı Consumer mesajı çekmesine rağmen, mesaj Queue üzerinde “Ready” state’inde durmaktadır. Queue üzerindeki mesajları silme gibi kararları, business rule’larımıza bağlı olarak bizlerin ele alması daha doğru bir karar olacaktır.

Bu makalemde basic bir şekilde RabbitMQ’nun C# Client’ını kullanarak, mesaj işlemlerini nasıl handle edebileceğimizi hep beraber inceledik. Bir sonraki RabbitMQ serimde görüşmek dileğiyle.

Sağlıcakla kalın.

RabbitMQSample

Bu makale toplam (4222) kez okunmuştur.

42
0



Kategori: RabbitMQ

11 Yorum

  1. Akansu Akansu

    Saolasın. Lakin bir windows servisi ile örneklemek daha hoş olurdu. 🙂

  2. Halit Halit

    Merhaba gökhan hocam. Rabbitmq konusunda mesajların default olarak memory üzerinde tutulduğunu okudum. Ancak gerçek bir senaryoda Rabbitmq sunucusunun kapanıp açıldığı durumda mesajların kaybolması söz konusu oluyor. Tüm mesajların diske yazılması da yoğun mesajlaşmada performansa etki eder mi? Bu gibi sorunlarda nasıl bir strateji uygulanır. Sonuçta herkesin verisi önemlidir ve kimse memory üzerinde kritik bilgi saklamak istemez.

    • Merhabalar, evet default olarak in-memory çalışmaktadır. Persistent’ı sağlayabilmek için ise “durable” ayarlarına bakabilirsiniz disk üzerine yazabilmek için. Detaylı bilgiye buradan erişebilirsiniz. Gerçek bir senaryoda ise bu sizin business’ınıza ve ihtiyaçlarınıza göre farklılık gösterir. Elbette in-memory kadar performanslı olmayacaktır disk’e yazılması. Disk’e yazma ihtiyacınızıda, iyi belirlemeniz gerek ne kadar yoğun bir trafik altında çalışacak ve boyutları ne olacak vb. Çünkü bu sadece asenkron yapılabilecek küçük işlemler için bir mesajlaşma yapısı ve o yüzden kritik bilgiler de içermez. Sadece bir yere yazılmış işi alır, ilgili yere yönlendirir (aktarır).

  3. merve yılmaz merve yılmaz

    Gökhan Hocam Merhaba.RabbitMQ makale serisi için teşekkürler,çok faydalı oldu.Publisher ile RabbitMQ ye 1000 in üzerinde kayıt atıp,tüm kayıtlar için Consumer i çalıştırdığım zaman RabbitMQ da Ready statusündeki tüm kayıtlar 0 olarak gözüktü ve Consumer kayıtları teker teker
    işlemeye başladı.Fakat ben Consumer u durdurunca işlenmemiş tüm kayıtlarda kayboldu.RabbitMQ da bu şekilde bir birikme olduğunda Consumer ın kayıtları teker teker işleyemeyip yarıda kesilirse nasıl bir yol izlemek gerekir?autoAck parametresini false set ettiğimiz zamanda Consumer ı yarıda kesince tüm kayıtlar tekrar Ready statusüne geçti.MQ nun mantığında kayıtları sırasıyla okunup silinmesi gerekmiyor mu?RabbitMQ da bunu biz mi yönetmeliyiz? Teşekküler

    • Merhaba, eğer pure rabbitmq’nun client’ı ile ilerlerseniz, reliability konularını sizin yönetmeniz gerekmektedir, yani ack bilgilerini. Sırasıyla okunup, başarılı bir şekilde işlenir ve ack bilgisi gönderilirse silinir, ack bilgisi gönderilemez ise, tekrardan ready moda geçecektir. Dilerseniz MassTransit ile ilgili olan makalelerime bir göz atın, reliability konularını handle etmektedir.

      • merve yılmaz merve yılmaz

        Merhaba,MQ konularına yeni bakıyorumda ActiveMQ yu da kurup denedim ve ActiveMQ da consumer mesajları tek tek almakta ve okuduğunuz mesaj otomatik olarak silinmektedir.Bu konuyuda araştırıyorum ama sizede sormak istedim.
        RabbitMQ da mesajları tek tek alıp consumer ı çalıştırabilir miyiz?ack bilgisini gönderebiliyoruz ama gördüğüm kadarıyla kuyruktaki tüm bilgiler için bu bilgi gönderilmektedir ve kuyrukta birden fazla kayıt varsa
        consumer durduğu zaman bunları ya tamamı ready durumuna geçmekte ya da consumer durdurduğunuzda işlenmemiş kayıtlar kaybolmaktadır.Producer ile consumer aynı anda değilde Producer ile belirli bir kayıt aktarıp
        sizin örnek kodunuz üzerinden Consumer ı çalıştırınca kuyruktaki tüm mesajları tek tek değilde tamamını almakta ve daha sonra tek tek kayıtlar işlenmektedir.
        RabbitMQ nun çalışma mantığı mı böyle,ben bunları sırası ile işleyip işlem yaptığım kaydı silmesi için tekrar tek bir kayıt için ack bilgisimi göndermem gerekiyor?

        • Bahsettiğiniz işlem için “Channel Prefetch Setting” ayarlarını yapmanız gerekmektedir consumer içerisinde. Detaylı bilgi için şu noktaya bakabilirsiniz: https://www.rabbitmq.com/consumer-prefetch.html Fakat, yüksek miktarda işlemler yapıyorsanız, consumer utilization’ını düşürecektir.

  4. Furkan Furkan

    BrokerUnreachableException was unhandled Hatası
    An unhandled exception of type ‘RabbitMQ.Client.Exceptions.BrokerUnreachableException’ occurred in RabbitMQ.Client.dll

    Additional information: None of the specified endpoints were reachable

    Hocam burada böyle bir hata alıyorum localde çalışmıyorum kullanıcı adı ve şifreyi tanımlamama rağmen böyle bir hata veriyor sebebi nedir ?

    • Merhaba, host bilgisinin port bilgisi ile doğru olduğundan emin misiniz?

  5. Oğuz Oğuz

    Merhabalar Gökhan hocam. Size bir kaç sorum olacak. Öncelikle yazılarınız için teşekkür ediyorum çok yardımcı oluyor. Sorumun ilki şudur. Bu yapıyı yani kuyruk yapılarını servis (rest servis) yapılı bir mimaride nereye oturtmam lazım? Ben de her modül için bir servis var. O servis içine o modüle özgü talepleri alıyor. Yani Token yapısı gibi, talepte bulunan kullanıcı bilgisi gibi. Bu durumda MQ yapımı nereye kurmam lazım? Servisin önüne koyup talepleri servise iletmesi mi yoksa servisim talepleri alıp MQ yapısına mı iletmesi?

    Buna bağlı olarak 2. sorum da şu olacaktır. Birden fazla server kullanıp yük dağılımı yapmak mı yoksa MQ yapısı kullanmak mı? Yoksa ikisi bir arada mı? Eğer ikisi bir arada olacaksa her server için ayrı bir MQ yapısı mı kurmam gerekli bunlar nasıl ortak çalışacak?

    Cevap için şimdiden teşekkürler.

    • Merhaba, kusura bakmayın geç cevap için.

      1) REST servis’leriniz deki iletişimi kuyruk yapıları ile, birbirlerinden decoupled hale getirebilmekte, asenkronlaştırabilmekte gibi konularda kullanabilirsiniz. Daha iyi scale olabilen, flexible servisler elde edebilirsiniz. Token yapınızda herhangi bir MQ ya gerek olduğunu düşünmüyorum. Token alma işlemleri genelde senkron ilerlemektedir. Belki login olan kişileri log tutmak istersiniz, o gibi durumlarda token veren servisiniz içerisinde bir event fırlatarak log tutarsınız ozaman ihtiyaç olur.

      2) İkisi bir arada. MOM dediğimiz Message Oriented Middleware tüm ekosisteminiz içinde tektir ve uygulamalarınız arası iletişimi sağlar.

      Teşekkürler.

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.