Gökhan Gökalp

MassTransit Saga State Machine ile Model Workflow’u Oluşturmak

MassTransit Saga State Machine ile Model Workflow’u Oluşturmak

Merhaba arkadaşlar.

Bir süredir gerek yoğun iş temposu gerekse de sosyal hayatımdaki bazı yoğunluklardan dolayı, yeni bir makale yazmaya fırsat bulamamıştım. Sizlerde fark ederseniz bir süredir makale konularımı microservice ve messaging yapıları üzerine yoğunlaştırdım.

Bu makale içeriğinde ise geliştiriyor olduğumuz microservice ve messaging yapılarında, long-running business process işlemlerinde consistency’i MassTransit Saga State Machine implementasyonu ile nasıl sağlarız ve nasıl bir model workflow’u oluştururuz gibi konulara, araştırmalarım sırasında edinebildiğim bilgiler doğrultusunda değinmeye çalışacağım.

16631677-abstract-word-cloud-for-distributed-transaction-with-related-tags-and-terms

Dilerseniz öncelikle Saga kavramını biraz açalım.

Nedir Bu Saga?

Saga pattern’ı, birden çok sistem ile ilgili bir collaborating söz konusu olduğunda ve herhangi bir failure anında backtrack veya corrective bir aksiyon alınabilmesini sağlayan bir patttern’dır.

Farklı bir tanımla ise long-running business process’lerde consistency’i sağlayabilmek için distributed transaction kullanım mekanizması olarak da tanımlayabiliriz. Saga kavramı, Hector Garcaa-Molrna ve Kenneth Salem tarafından Sagas isimli akademik çalışması ile ele alınmıştır.

Bir önceki “MassTransit kullanarak RabbitMQ ile Messaging Altyapısı Oluşturma” makalesinde gerçekleştirdiğimiz örnek senaryoyu hatırlarsak eğer:

order-mq

Yukarıdaki gibi basit bir e-commerce sistemindeki sipariş işlemini örnek vermiştik.

Burada yer alan “OrderService” ile “IOrderCommand” tipindeki message’ları consume ederek, farklı işlemler ve event’lar gerçekleştirmiştik. Sonrasında ise sistemin “XCommand” message’ına göre işlem yapacak olan “XService” lere sahip olabileceğinden de bahsetmiştik. Gelin buraya order işlemi ile ilgili bir kaç yeni adım daha ekleyelim.

saga-order

Order senaryosuna “BillingService” ve “FraudService” ide dahil ettik.

Order işleminin tamamlanabilmesi için event’lar sequentially veya parallel olarak distributed bir şekilde bu service’ler arasında işlenmektedir. Peki bunar gibi -n kadar daha service’i sisteme dahil ettiğimizi düşünelim. Peki şimdi birbirleri ile related olan bu service’ler aralarındaki message flow’u, nasıl manage edilebilecek? Bunlara ek olarak birde gerçekten uygulamaları microservice olarak geliştiriyorsak ve order işlemi gibi belli step’lerin sonucunda bir artifact oluşturan süreçlere de sahipsek, transaction bütünlüğü reliable bir şekilde nasıl sağlanacak? Çünkü bazı event’lar başarılı bir şekilde process edilebilirken bazılarının ise fail olabilme durumu mevcut olacaktır. Bu tarz işlemler ise tutarsız(inconsistent) sonuçlar doğuracaktır.

İşte bu tarz süreçler için MassTransit framework’ü içerisindeki Saga State Machine ile bir workflow oluşturacağız. Bu implementasyon ile temelinde birden çok iş akışlarının birden çok sisteme dağılmasında, herhangi bir iş aşamasında meydana gelebilecek olan aksi bir eyleme karşı önlem(compensation) alınabilmesini sağlayacak yapılar yazabilmek mümkündür.

Peki Saga bunu nasıl sağlıyor?

saga_compensation

Yukarıdaki resimde order işlemi gibi long lived business process’in olduğunu düşünelim ve burada, transaction T4 durumundan C4 durumuna geçerken bir hata alındığı için tekrardan T3 durumundan T4 durumuna bir compensation sağlandığını görüyoruz.

Implementasyon kısmına başlamadan önce, sizlere bir kaç tamamlayıcı kavramdan bahsetmek istiyorum. Öncelikle model workflow’u oluşturmak, bir saga değildir.  Workflow state’den bağımsızdır ve State Machine’in aksine activity‘ler ile ilgilenir. Workflow üzerindeki transition’lar ise, herhangi bir action tamamlandığında gerçekleşir. Workflow, State Machine ile beraber tanımlanabilir ve iş akışları belirli state’ler doğrultusunda ilerletilebilinir.

State Machine Nedir?

State Machine, bir object’in bir state’den bir diğer state’e  transition’ı ile ilgilenir. State Machine bu işlemi ise saga object’ine gelen bir request’in state’ini, bir başka yerde correlation id ile beraber persist ederek handle eder. Sonrasında ise o correlation id’ye bağlı olarak gerçekleşen event’lar ile object state’ini yönetir.

Bu yapı ise long-running business process’lerde state koordinasyonunu merkezileştirmeyi ve tutarlılığını sağlamaktadır. Açıkçası biraz karmaşık bir konu fakat ilgili business’a bağlı olarak uygulandığında, taşlar yerlerine kendiliğinden oturuyor.

MassTransit Saga State Machine ile Model Workflow’u Implementasyonu

Gerçekleştirecek olduğumuz örnekte daha önce de bahsettiğim gibi bir önceki “MassTransit kullanarak RabbitMQ ile Messaging Altyapısı Oluşturma” makale konusu üzerinden devam edeceğiz. Bu örnek üzerinde bir kaç noktayı refactor edeceğiz ve ardından MassTransit Saga implementasyonunu ekstra bir “BillingService” i daha ekleyerek, bir model workflow’u oluşturacağız.

Refactor edeceğimiz ilk nokta hatırlarsak ilk message’ı produce ettiğimiz nokta “LightMessagingCore.Boilerplate.OrderUI” projesi idi.

Burada “OrderController” içerisinde initialize ettiğimiz bus üzerinden “OrderQueue” ya bir “IOrderCommand” produce ediyorduk.

Bu noktada artık ilgili command’ı direkt olarak “OrderQueue” ya produce etmek yerine, “SagaQueue” ya produce edeceğiz. Bunun için “sendToUri” kısmını aşağıdaki gibi güncelleyelim.

Çünkü bir Saga State Machine tanımlayarak, bir model workflow’u oluşturacağız. Ilk produce olan command ile birlikte bir state oluşturacağız ve correlation’ı sağlayarak ilgili process’leri MassTransit State Machine’i üzerinden devam ettireceğiz. Bu yüzden command’ın ilk produce olacağı nokta, Saga Queue’su olacaktır.

URI’ı güncelledikten sonra “Web.config” e gelerek, “SagaQueueName” key’ini aşağıdaki gibi tanımlayalım.

“LightMessagingCore.Boilerplate.OrderUI” projesi içerisinde yapacaklarımız bu kadar.

Sagas implementasyonuna geçmeden önce ihtiyaç duyacağımız messaging contract’larını, “LightMessagingCore.Boilerplate.Messaging” projesi altında aşağıdaki gibi tanımlayalım.

“IOrderReceivedEvent” ını “OrderService” içerisinde, order sagas üzerinden bir message produce edildiğinde consume edebilmek için kullanıyor olacağız.

“IOrderProcessedEvent” ını ise “OrderService” içerisinde consume edilen message’ı business requirements doğrultusunda handle ettikten sonra, faturalandırılma işlemleri için “BillingService” e bir event fırlatabilmek için kullanacağız.

“LightMessagingCore.Boilerplate.OrderService” projesine gidelim ve “OrderReceivedConsumer” class’ını aşağıdaki gibi refactor edelim.

Bir önceki örneğimizden farkı ise artık “IOrderCommand” yerine “IOrderReceivedEvent” larını consume ediyoruz ve ilgili business işlemleri gerçekleştirdikten sonra, geriye bir “IOrderProcessedEvent” ı publish ediyoruz.

Faturalandırma işlemleri için ise business’a “BillingService” i dahil edeceğimizi söylemiştik. Solution üzerine “LightMessagingCore.Boilerplate.BillingService” isminde yeni bir console application daha ekleyelim. Ekleme işleminin ardından “System.Configuration”, “LightMessagingCore.Boilerplate.Common” ve “LightMessagingCore.Boilerplate.Messaging” library’lerini referans olarak ekleyelim. Son olarak da Nuget Package Manager üzerinden “MassTransit.RabbitMQ” paketini kuralım. Artık consumer kısmını kodlamak için hazır durumdayız.

“OrderProcessedConsumer” isminde bir class ekleyelim ve aşağıdaki gibi kodlayalım.

Bu kısımda daha önceki örnekte de olduğu gibi, yeni oluşturmuş olduğumuz “IOrderProcessedEvent” ını consume ediyoruz ve ilgili business işlemlerini burada gerçekleştiriyoruz. Oluşturduğumuz consumer’ı “Program.cs” de aşağıdaki gibi initialize edelim.

Console application üzerinden host olacak olan bus, “OrderQueueName” üzerinden gelecek olan receive endpoint’e göre consume işlemlerini “OrderProcessedConsumer” üzerinden gerçekleştirecektir.

“App.config” bilgisini de aşağıdaki gerçekleştirelim.

“BillingService” ide “OrderService” den sonraki business step’i olarak sisteme dahil ettik.

Şimdi saga state machine ile birlikte workflow kısmını oluşturmaya başlayabiliriz.

Solution’a “LightMessagingCore.Boilerplate.Saga” isminde yeni bir console application daha ekleyelim. Ekleme işleminin ardından “System.Configuration”, “LightMessagingCore.Boilerplate.Common” ve “LightMessagingCore.Boilerplate.Messaging” library’lerini referans olarak ekleyelim. Referans ekleme işlemini tamamladıktan sonra Nuget Package Manager üzerinden “MassTransit.RabbitMQ” ve “MassTransit.Automatonymous” paketlerini kuralım.

Tüm işlemleri tamamladıktan sonra şimdi “OrderSagaState” isminde yeni bir class oluşturalım ve “SagaStateMachineInstance” interface’ini implemente edelim.

Bu class order’ın state’inin tutulacağı class dır. “CorrelationId” property’sine message’ları business flow’u sırasında correlate edebilmek için ihtiyaç duymaktadır Saga State Machine. “CurrentState” property’si ile ise mevcut state tutulmaktadır ve type’ını “Automatonymous” namespace’i altından almaktadır. Geriye kalan “OrderId” ve “OrderCode” gibi property’ler ise, state’ini tutacağımız message’ın property’leridir.

“OrderController” ı refactor ederken artık message’ları direkt olarak “SagaQueue” ya produce edeceğimizden ve initialize işlemini saga tarafından gerçekleştireceğimizi söylemiştik. Saga içerisinden ilgili message command’ı için state ve correlation işlemlerini gerçekleştirdikten sonra order business’ının gerçekleşebilmesi için “OrderService” e bir “IOrderReceivedEvent” ı publish edeceğimizi söylemiştik. Bunun için yine “LightMessagingCore.Boilerplate.Saga” projesi içerisine “Messages” klasörü ekleyelim ve içerisine “OrderReceivedEvent” isminde bir event tanımlayalım ve “IOrderReceivedEvent” interface’ini implemente edelim.

Bu event içerisinde constructor injection aracılığı ile bir “orderSagaState” alıyoruz ve order business’ının gerçekleşebilmesi için “OrderService” e publish edeceğimiz property’leri tanımlıyoruz.

“OrderSaga” yı oluşturmak için artık hazırız. Burada önce order’ın business gereği sahip olabileceği state’leri tanımlayacağız ve ardından gerçekleşebilecek event’ları tanımlayıp, bir workflow oluşturacağız. Öncelikle “OrderSaga” isminde bir class ekleyelim ve “MassTransitStateMachine<T>” class’ını aşağıdaki gibi implemente edelim.

“MassTransitStateMachine<T>” class’ına implemente ederken type olarak state tutmak için oluşturmuş olduğumuz “OrderSagaState” i geçiyoruz.

Örnek senaryomuz gereğince order, ilk “OrderService” e düştüğünde bir “Received” statüsüne ve ilgili order business’ı gerçekleştirildikten sonra da bir “Processed” statüsüne sahip olabilir. Bu yüzden “State” type’ına sahip, “Received” ve “Processed” property’lerini tanımlıyoruz. Event olarak ise ilk message “IOrderCommand” interface’i ile “OrderUI” tarafından produce edildiğinde, şuan oluşturuyor olduğumuz saga tarafından consume edilecektir. Bu yüzden event olarak bir “OrderCommand” bulunmaktadır. “OrderService” tarafından ilgili işlemler gerçekleştirildikten sonra publish edilen bir diğer event olarak da “IOrderProcessedEvent” bulunmaktadır. Bunun için birde “OrderProcessed” event’ı tanımlıyoruz.

Şimdi “OrderSaga” nın constructor’ı içerisinde initial state’i tanımlayarak, state machine üzerine event’ları aşağıdaki gibi tanımlayalım.

“InstanceState” method’u ile “OrderSagaState” i, state property’si üzerinden initialize ediyoruz. Ardından “Event” method’u ile içerisinde sıralaması önemli olarak ilk gelecek olan event’in “OrderCommand” olacağını ve bu message flow’unu, state’in “OrderCode” property’si üzerinden(unique olması önemli) correlate edileceğini söylüyoruz. Buradaki “state” state’ini tutmak için oluşturmuş olduğumuz yeni “OrderSagaState” objesidir ve “context” expression’ı ise o an consume edilen “IOrderCommand” event’ıdır.

“SelectId” method’u ile ise correlation işlemleri için bir id seçimi yapıyoruz. Bu işlem için eğer kullanabileceğiniz unique bir correlation id’niz yok ise, yeni bir guid oluşturup gerçekleştirmek mümkündür. Diğer stepler arasında workflow tarafından bu correlation id otomatik olarak set edilecektir. Bir diğer event olan “OrderProcessed” ı tanımlıyoruz ve correlation işleminin “Message” üzerindeki “CorrelationId” den sağlanacağını belirtiyoruz.

Gerekli event tanımlamalarını da tamamladıktan sonra, şimdi workflow için activity’leri tanımlamaya başlayabiliriz.

“Initially” method’u ile ilk başta ne işlemi gerçekleştireceğimizi tanımlayacağız. Bunun için fluent bir şekilde “When” ve “Then” method’larını kullanıyoruz. Yani bir “OrderCommand” geldiğinde sonra “context.Instance” a, consume edilen data üzerindeki değeri alması gerektiğini söylüyoruz. Bu noktada “context.Instance” state’i tuttuğumuz “OrderSagaState” objesi oluyor. “ThenAsync” method’u ile yapmak istediğiniz farklı business işlemleri var ise async bir şekilde burada gerçekleştirebilmek mümkündür.

Bunlara ek olarak fluently bir şekilde kullanabileceğiniz, bazı harika method’lar da mevcuttur. Örnek vermek gerekirse conditional işlemler gerçekleştirebilmek için “If” method’u, saga kısmına compensation chain oluşturabilmek için “Catch” method’u gibi bazı method’lar mevcuttur. Bunlara ek olarak state bağlımlı scheduled işlemler de gerçekleştirebilmek için “Schedule” method’uda vardır.

“Initially” method’unda yukarıdaki son iki satırda da, “TransitionTo” method’u ile state’in artık “Received” olduğunu ve ardından “Publish” method’u ile işlenmek üzere order command’ı bir “OrderReceivedEvent” olarak queue’ya fırlatıyoruz.

Artık order saga’nın ilk başta ne yapması gerektiği hazır durumdadır. Artık workflow üzerine diğer activity’leri de tanımlamaya başlayabiliriz. “During” method’u ile objenin belirtilen state’e sahip olduğunda gerçekleşmesini istediğimiz işlemleri tanımlıyoruz. Biz ise örnek senaryomuz gereği obje “Received” state’ine sahipse ve “BillingService” için “OrderProcessed” event’ı yakalandıysa, console ekranına ilgili işlem bilgisini yazmasını söylüyoruz. Yani bu kısımda “BillingService” devreye girip çalışmaya başlamasıdır. ve “Finalize” method’u ile ilgili state’i tamamlıyoruz.

Tüm bu işlemlerin sonunda artık yukarıdaki satır ile, ilgili saga state instance’ının sonlandığını söylüyoruz ve MassTransit tarafından artık bu instance bilgileri repository üzerinden otomatik olarak silinecektir. (bu kısma birazdan değineceğim)

Artık bir order workflow’una ve state machine’ine sahibiz. Örnek senaryomuz gereği her bir servis ilgili işlemini process ederken, saga console’u üzerinden gelen bilgilerini görebiliyor olacağız.

Şimdi “Program.cs” e gelelim ve burada saga host’unu aşağıdaki gibi initialize edelim.

Burada öncelikle az önce yukarıda bahsetmiş olduğum repository kavramından bahsetmek istiyorum. MassTransit ilgili state’leri repository’de tutmaktadır. Ben örnek gereği “InMemorySagaRepository<T>” ile state işlemlerimi isminden de anlaşılabileceği gibi in memory olarak gerçekleştirdim. Bu işlemleri database üzerinde entityframework repository’si ile de gerçekleştirebilmek mümkündür. Bu işlemlerin ardından diğer host’lardan tek farkı ise, “Consumer” method’u yerine “StateMachineSaga” method’unu kullanmamızdır.

Test işlemlerinden önce”App.config” bilgilerini aşağıdaki gibi güncelleyelim.

Dilerseniz şimdi solution’ın multiple startup project kısmına “LightMessagingCore.Boilerplate.BillingService” ve “LightMessagingCore.Boilerplate.Saga” yı da dahil edelim ve start’a basalım.

order_saga_ui_

Açılan “OrderUI” üzerinden yukarıdaki bilgileri girelim ve “Create” butonuna basalım.

saga-state

Dikkat ederseniz “IOrderCommand” message’ı ilk olarak “Saga” ya geldi ve “IOrderReceivedEvent” ı olarak publish edildi. Ardından “OrderService” publish edilen event’ı handle ettikten sonra “IOrderProcessedEvent” ını publish etti. “BillingService” de ilgili event’ı consume ederek, handle işlemini gerçekleştirdi.

Her bir step, state’ler doğrultusunda gördüğümüz gibi saga workflow’u üzerinden geçerek gerçekleşti. Sizlerde ihtiyaçlarınız doğrultusunda workflow üzerinde activity’leri tanımlarken, gerçekleştirmek istediğiniz eylemleri de tanımlayabilirsiniz.

Bir sürelik aradan sonra umarım faydalı bir makale içeriği olmuştur. Örnek uygulamanın tümüne aşağıdan erişebilirsiniz.

https://github.com/GokGokalp/lightmessagingcore-boilerplate-with-saga

Referanslar:

http://automatonymous.readthedocs.io/en/latest/overview/saga.html?highlight=StateMachineSaga
http://vasters.com/archive/Sagas.html
http://arnon.me/2013/01/saga-pattern-architecture-design/
https://medium.com/@roman01la/confusion-about-saga-pattern-bbaac56e622#.npnnb9una

Bu makale toplam (266) kez okunmuştur.

5
0



Bir Cevap Yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

*