Website Logo. Upload to /source/logo.png ; disable in /source/_includes/logo.html

Zuzur’s wobleg

technology, Internet, and a bit of this and that (with some dyslexia inside)

Amusons-nous Avec Grails Et Les Transactions…

| Comments

J’ai posté une question cette nuit sur stack overflow à propos d’un problème bizarre avec grails et les transactions.

J’ai une classe de domaine qui enregistre le résultat de requêtes réalisées par le biais d’une file d’attente (une queue RabbitMQ).

Depuis l’application web, l’utilisateur “demande” que l’application envoie une requête dans la file.

En gros:

  • l’utilisateur clique sur “démarrer le process”
  • Le contrôleur passe par un Service qui s’occupe de faire toute la plomberie
    • crée un enregistrement qui marque l’existence de la requête
    • récupère l’identifiant de cet enregistrement
    • utilise cet identifiant dans le message poussé dans RabbitMQ, qui devra être conservé tout au long de la chaîne de traitement
  • le message est finalement reçu par un ‘worker’ qui fonctionne sur une autre machine, effectue la tâche demandée et quand il a terminé, poste à son tour un message de contrôle portant l’identifiant original
  • le serveur récupère l’enregistrement portant l’identifiant dans le message et enregistre les informations fournies par le worker dans le message

Rien que de très classique dans ce genre d’architecture, finalement… Normalement, tout est parfait avec ça, mais c’est sans compter sur la merveilleuse couche de persistance de Grails… je vais vous expliquer…

Les services et les transactions avec grails

Dans tout Framework MVC (Model View Controller), les contrôleur sont sensés juste être un pont entre la couche présentation (View) et la couche de données (Model). Ils doivent rester simples et ne rien faire qui dépasse des opérations simples sur les objets. Liste des enregistrements dans une table, retrouver un enregistrement par id, etc…

Mais puisqu’on développe une application, c’est qu’on a besoin de faire des trucs compliqués avec les données qui sont dans la base…

C’est là qu’interviennent les Services. Dès que la complexité d’une opération dépasse ce que j’ai décrit un peu avant, il faut passer par un Service. Si vous gérez une bibliothèque et que vous voulez trouver tous les emprunts de plus de 3 semaines pour les livres de Science-Fiction, vous n’allez pas écrire le code nécessaire dans le contrôleur pour les livres. On ne met pas de logique “Business” dans le contrôleur. Vous allez écrire un service qui implémente une méthode listeDesEmpruntsAEcheance(categorie) et l’appeler depuis votre contrôleur…

Une transaction mais cékoidon ?

Par défaut, dans Grails, tous les Services (les classes de domaine aussi d’ailleurs) sont encapsulés dans un gestionnaire de transactions, et à l’appel de chaque méthode du service, une nouvelle transaction est ouverte. Si, au cours de l’execution de cette méthode, on fait des modifications sur la base de données, ces modifications ne sont réellement transmises à la base de données (commit) que si la méthode ne provoque pas d’erreur. Le gestionnaire garde une sorte de journal avec la liste des opérations à effectuer et les joue sur la couche de données à la fin d’une operation sans erreur. Dans le cas contraire, le gestionnaire de transaction va lire le journal à l’envers et défaire une à une les opérations du journal (rollback)

C’est beaucoup plus compliqué que ça mais c’est en gros comment ça marche…

Si au cours du traitement, une nouvelle méthode du Service ou d’un autre Service est appelée, une nouvelle transaction est créee et la même logique intervient… pas d’enregistrement des données si ça pète.

Le gros problème avec ça c’est que les transactions du point de vue de la base données sont linéaires. On ne peut pas ouvrir une nouvelle transaction au milieu d’une autre. Alors que du point de vue d’un programmeur (et de la couche d’abstraction décrite ici), il est évident qu’une pile d’appels génère une pile de transactions…

Je n’ai pas encore bien compris comment ce problème était résolu par le gestionnaire de transactions qu’utilise Grails (celui du projet Spring), mais je pense que c’est de là que provient le bug qui que je vais tenter de vous explique (vous êtes encore là ? chapeau !)

Les threads, ces amis qui vous veulent du mal

Il est évident qu’un serveur d’application peut recevoir plein de requêtes simultanément et doit y répondre le plus vite possible. Pour régler ce problème grails gère un pool de fil d’execution (des Threads) et passe chaque requête à un thread qui va la passer au bon controleur, qui à son tour va appeler un service etc, etc…

Tout ce petit monde fonctionne en parallèle, se bloque quand il faut pour ne pas modifier les données en même temps (deadlock), etc… donc tous les services sont appelés dans des threads séparés.

Par exemple, le système qui va écouter une file d’attente pour recevoir les messages provenant de RabbitMQ va fonctionner dans son propre thread, etc … ça se configure de façon très simple. Par exemple, avec le plugin rabbitmq-native:

recevoir les messages avec RabbitMQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
rabbitmq {
  connection = {
    connection host: '192.168.55.100',
    username: 'guest',
    password: 'guest',
    threads: 10
  }
  queues = {
    exchange name: 'amq.topic', type: 'topic', autoDelete: false, durable: true, {
      // ...
      queue name: 'control', durable: true, autoDelete: false, binding: 'control.*'
    }
  }

}

On déclare 10 threads qui vont attendre des messages sur le broker (donc on a configuré le worker pour traiter 10 messages simultanés - c’est beaucoup dans le cas de l’application que je développe, mais c’est juste pour essayer, et justement, être certain de provoquer des problèmes qui proviendrait de mises à jour concurrentes sur des données), et une file d’attente qui s’appele control. Je ne vais pas m’étendre sur les exchange et autres, ce n’est pas le propos…

Pour recevoir les messages, il suffit de faire ça dans une classe groovy:

1
2
3
4
static rabbitConfig = [
    queue: 'control',
    retry:   true
]

On ajoute une petite méthode qui par convention s’appele handleMessage()

1
2
3
4
5
6
7
8
def myService // injecte automatiquement une instance du service MyService dans le
              // récepteur de messages
def handleMessage(message, MessageContext context) {

  log.debug "Received result from worker r:${message.id}"

  myService.acknowledgeWorkerJob(message)
}

C’est trop beau, trop simple pour être vrai ! (oui :-)) et on remarque encore une fois qu’on répond au principe très important de séparation de la couche de transmission des données avec la couche de gestion d’icelles…

On se souvient que ce code va être executé dans un des 10 threads lancés par le plugin qui attend des messages …

l’interface utilisateur

Lorsque l’utilsateur demande l’execution d’un job, le controleur appelle directement le service:

1
2
3
4
5
def job(DomainClass domain) {
  myService.postJob(domain)
  flash.message =  "Request for job '${domain}' performed"
  redirect action: "show", params:[id:domain.id]
}

Et la méthode idoine du service (postJob()):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def postJob = { Domain d ->
      requestJob(d)
}

private def requestJob(Domain d) {
  JobRequest jr // another domain class that allows to record that a job
                // execution request was performed

  // crée une nouvelle instance de JobRequest qui porte les infos sur
  // les données à traiter (d)
  jr = new JobRequest(domain: d)

  // on demande à la couche de persistance de sauvegarder la JobRequest
  // à ce moment, elle est sensée lui attribuer un nouvel identifiant
  jr.save(failOnError: true)

  // le contenu du message envoyé par RabbitMQ
  // une simple Map qui porte l'identifiant de la requête
  def message = [
    jr: jr.id.toString(), // toString() car ce sont des UUID et on a  besoin d'une représentation portable
    // ... et les autres données nécessaires au worker qui va recevoir
    // le message
    domain: d.id.toString(),
    // ...
  ]

  def key = "worker.${d.routing_info}"
  log.debug "Pushing to worker['$key'] d.id: ${d.id}"

  // et là on passe par le plugin RabbitMQ pour envoyer un message
  // dans la bonne file
  new RabbitMessageBuilder().send() {
    exchange = 'amq.topic'
    routingKey = key
    body = message
  }

  log.debug("job request for ${d.id} posted to rabbitmq with routingKey $key ${jr.id}")
}

Et la méthode qui reçoit les données depuis la file control

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def acknowledgeWorkerJob(Map message) {
    // on valide le message, on ne sait jamais !
    validateAckMessage(message)

    // on récupère les information dans le message
    // le plus important étant jr_id
    // (on reconstruit des UUID)
    def jr_id = UUID.fromString(message.jr)
    def domain_id   = UUID.fromString(message.domain)
    // et les autres infos (résultat du worker, etc...)

    // récupère la JobRequest correspondante dans la base
    JobRequest jr = JobRequest.get(jr_id)

    if (!jr) {
      // ??? On n'a pas trouvé de JobRequest correspondante ???
      throw new IllegalArgumentException("can't find JobRequest with id: ${jr_id}")
    }


    // c'est bon, on peut faire ce qu'on veut, maintenant

    log.debug("Job ACK received: ${jr.id}")

    if (jr.domain.id != domain_id) {
        // MMMM !? bizarre, on a un problème d'intégrité référentielle ???
        throw new IllegalArgumentException("Invalid ack message ! JobRequest domain (${jr.domain}) doesn't match message's id (${domain_id})...")
    }


    // maintenant, on peut faire le vrai boulot...
}

Mon problème

Revenons à mon problème…

Le truc qui ne va pas, c’est que quoi que j’essaye (et j’ai essayé plein de trucs !), le record jr dans la méthode requestJob(Domain d) n’est jamais effectivement créé. Après l’aller-retour entre le serveur et les workers, la méthode acknowledgeWorkerJob() ne retrouvera jamais l’instance de JobRequest qui a été créée au début du traitement.

Je pense que ça provient d’un problème de pool de threads différent. Un pool de threads géré par le serveur d’application qui a son propre gestionnaire de transactions et l’autre pool créé par le plugin RabbitMQ.

Je cherche encore… même en faisant boucler acknowledgeWorkerJob() en attendant de trouver un record qui correspond, ça ne fonctionne pas…

Bizarrement, j’ai réussi à faire fonctionner l’ensemble en faisant en sorte que les méthodes appelées directement par le controleur passent pas un nouveau pool de threads éphémères en utilisant la bibliothèque GPARS de groovy:

snippet.groovy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    // pour une simple instance provenant du controleur
    def postJob = { Domain d ->
      withPool(2) {
        // try to trick the framework into calling the sendCrawl method inside
        // a thread in order to understand what's going on :
        // http://stackoverflow.com/questions/27670529/strange-grails-service-persistance-issue
        [d].eachParallel { e ->
          requestJob(e)
        }
      }
    }

    // liste de Domain provenant du controleur
    def parallelPostJobs = { domains ->
      // work in parallel (20 threads) on the list of Domain instances
      withPool(20) {
        domains.eachParallel { d ->
          postJob(d)
        }
      }
    }

Et avec ça, de façon bizarre autant qu’étrange, acknowledgeWorkerJob() retrouve ses petits !

C’est une solution, mais pour moi, elle pue la mort à des kilomètres, n’importe quel développeur normal qui lirait ça devrait venir dans mon bureau “m’expliquer des trucs” avec une hache de pompier à la main ;-) En tout cas, moi c’est ce que je ferais…

parallelPostJobs() était là depuis le début et fonctionnait parfaitement, et c’est comme ça que je suis venu à ce contournement moisi pour une seule requête…

- non mais tu fais quoi la Dédé ?! c’est quoi ce bazar !?
- euh…
- non mais !? … tu es malade ou quoi ?! c’est quoi ce truc avec GPARS dans ton controleur ? Hein ?!
- ben… euh… lis donc cet article et ferme ta mouille !

En attendant, j’attends avec impatience un réponse à ma question sur Stack Overflow :)

Edit

Après un commentaire de Luis, j’ai décidé de comprendre un peu mieux comment grails gérait les transactions…

La lecture de cet article sur le blog d’Octo a été très instructive !

En gros,

  • Les services dans Grails sont par défaut @Transactional, ce qui veut dire que tous les appels passent automatiquement par une méthode “proxy” qui démarre une transaction, la commite si tout va bien, et la défait (rollback) en cas de levée d’exception - il faut cependant faire attention au type d’exception, car ce n’est pas le cas des “checked” exceptions par défaut.
  • ma méthode de service marquée private ne bénificiait pas de ce proxy, et donc je pouvais toujours m’acharner sur tous les paramètres possibles de @Transactional que j’essayais de placer dessus…
  • par défaut, Grails génère le code des controleurs avec @Transactional(ReadOnly=true), ce qui fait qu’on ne peut pas faire de modifications sur la base avec une méthode d’un controleur sans la marquer individuellement par @Transactional. Et c’était mon erreur. J’avais juste ajouté une “action” au contrôleur sans changer le défaut en lecture seule, et je me retrouvais avec une session en lecture seule dans une méthode où j’essayais de faire un insert().
  • Mes bidouilles avec des threads n’ont fait que masquer le problème car c’était des closures ! et donc elles ne passaient pas par le proxy transactionnel et ça avait l’air de fonctionner

Une jolie accumulation d’erreurs de débutant…

Comments