--- rpl/src/interruptions.c 2010/08/23 08:04:40 1.34 +++ rpl/src/interruptions.c 2010/08/25 09:06:49 1.35 @@ -1730,6 +1730,7 @@ deverrouillage_gestionnaire_signaux() #define nombre_queues 13 static int *fifos; +static int markov; static int segment; static sem_t *semaphores[nombre_queues]; static sem_t *semaphore_global; @@ -1824,17 +1825,61 @@ creation_fifos_signaux(struct_processus * SIGSTART, SIGINJECT, SIGABORT, SIGFABORT */ + int i; + + unsigned char *nom; + # ifndef IPCS_SYSV // POSIX + + if ((nom = nom_segment((*s_etat_processus).chemin_fichiers_temporaires, + getpid())) == NULL) + { + (*s_etat_processus).erreur_systeme = d_es_allocation_memoire; + return; + } + + if ((segment = shm_open(nom, O_RDWR | O_CREAT | O_EXCL, + S_IRUSR | S_IWUSR)) == -1) + { + free(nom); + (*s_etat_processus).erreur_systeme = d_es_allocation_memoire; + return; + } + + if (ftruncate(segment, nombre_queues * ((2 * longueur_queue) + 4) * + sizeof(int)) == -1) + { + free(nom); + (*s_etat_processus).erreur_systeme = d_es_allocation_memoire; + return; + } + + fifos = mmap(NULL, nombre_queues * ((2 * longueur_queue) + 4) * sizeof(int), + PROT_READ | PROT_WRITE, MAP_SHARED, segment, 0); + close(segment); + + if (((void *) fifos) == ((void *) -1)) + { + if (shm_unlink(nom) == -1) + { + free(nom); + (*s_etat_processus).erreur_systeme = d_es_allocation_memoire; + return; + } + + free(nom); + (*s_etat_processus).erreur_systeme = d_es_allocation_memoire; + return; + } + + free(nom); + # else // SystemV file *desc; - int i; - key_t clef; - unsigned char *nom; - // Création d'un segment de données associé au PID du processus courant chemin = (*s_etat_processus).chemin_fichiers_temporaires; @@ -1863,7 +1908,7 @@ creation_fifos_signaux(struct_processus free(nom); if ((segment = shmget(clef, - nombre_queues * (longueur_queue + 4) * sizeof(int), + nombre_queues * ((2 * longueur_queue) + 4) * sizeof(int), IPC_CREAT | IPC_EXCL | S_IRUSR | S_IWUSR)) == -1) { (*s_etat_processus).erreur_systeme = d_es_allocation_memoire; @@ -1893,6 +1938,7 @@ creation_fifos_signaux(struct_processus * 2 : longueur de la queue (int) * 3 : éléments restants (int) * 4 à 4 + (2) : queue (int) + * 4 + (2) + 1 ) 4 + 2 * (2) : horodatage en centième de secondes. */ for(i = 0; i < nombre_queues; i++) @@ -1945,6 +1991,8 @@ creation_fifos_signaux(struct_processus free(nom); + markov = 0; + return; } @@ -1953,12 +2001,25 @@ liberation_fifos_signaux(struct_processu { int i; +# ifdef IPCS_SYSV // SystemV + if (shmdt(fifos) == -1) { (*s_etat_processus).erreur_systeme = d_es_allocation_memoire; return; } +# else // POSIX + + if (munmap(fifos, nombre_queues * ((2 * longueur_queue) + 4) * sizeof(int)) + != 0) + { + (*s_etat_processus).erreur_systeme = d_es_allocation_memoire; + return; + } + +# endif + for(i = 0; i < nombre_queues; i++) { if (sem_close(semaphores[i]) != 0) @@ -1984,6 +2045,8 @@ destruction_fifos_signaux(struct_process unsigned char *nom; +# ifdef IPCS_SYSV // SystemV + if (shmdt(fifos) == -1) { (*s_etat_processus).erreur_systeme = d_es_allocation_memoire; @@ -2006,6 +2069,32 @@ destruction_fifos_signaux(struct_process unlink(nom); free(nom); +# else // POSIX + + if (munmap(fifos, nombre_queues * ((2 * longueur_queue) + 4) * sizeof(int)) + != 0) + { + (*s_etat_processus).erreur_systeme = d_es_allocation_memoire; + return; + } + + if ((nom = nom_segment(NULL, getpid())) == NULL) + { + (*s_etat_processus).erreur_systeme = d_es_allocation_memoire; + return; + } + + if (shm_unlink(nom) != 0) + { + free(nom); + (*s_etat_processus).erreur_systeme = d_es_allocation_memoire; + return; + } + + free(nom); + +# endif + for(i = 0; i < nombre_queues; i++) { if ((nom = nom_semaphore(getpid(), i)) == NULL) @@ -2040,30 +2129,83 @@ destruction_fifos_signaux(struct_process return; } -int -queue_in(pid_t pid, int signal) +inline int +horodatage() { -#undef printf -// Transformer ce truc en POSIX ! On ne fait du SysV que si on n'a pas le choix + int ts; -# ifndef IPCS_SYSV -# else // Traitement à l'aide d'IPCS SystemV + struct timeval tv; + + gettimeofday(&tv, NULL); + ts = (int) ((tv.tv_sec * 100) + (tv.tv_usec / 10000)); + return(ts); +} + +int +queue_in(pid_t pid, int signal) +{ + int queue; int *base; int *buffer; - int *projection_fifos; - int queue; + int horodatage_initial; int identifiant; - - key_t clef; + int *projection_fifos; sem_t *semaphore; - struct stat s_stat; + queue = queue_de_signal(signal); unsigned char *nom; - queue = queue_de_signal(signal); +# ifndef IPCS_SYSV + + // Ouverture des projections + + if ((nom = nom_segment(NULL, pid)) == NULL) + { + return(-1); + } + + // Dans le cas de SIGSTART, premier signal envoyé à un processus fils, + // il convient d'attendre que le fichier support soit effectivement + // accessible. Dans tous les autres cas, ce fichier doit exister. S'il + // n'existe plus, le processus associé n'existe plus. + + if (signal == SIGSTART) + { + horodatage_initial = horodatage(); + + while((identifiant = shm_open(nom, O_RDWR, S_IRUSR | S_IWUSR)) == -1) + { + if (abs(horodatage_initial - horodatage()) > 500) + { + return(-1); + } + } + } + else + { + if ((identifiant = shm_open(nom, O_RDWR, S_IRUSR | S_IWUSR)) == -1) + { + return(-1); + } + } + + projection_fifos = mmap(NULL, nombre_queues * ((2 * longueur_queue) + 4) + * sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED, identifiant, 0); + close(identifiant); + + if (((void *) projection_fifos) == ((void *) -1)) + { + return(-1); + } + +# else // Traitement à l'aide d'IPCS SystemV + + key_t clef; + + struct stat s_stat; // Ouverture des projections @@ -2081,7 +2223,15 @@ queue_in(pid_t pid, int signal) { // On attend que le fichier sois présent - while(stat(nom, &s_stat) != 0); + horodatage_initial = horodatage(); + + while(stat(nom, &s_stat) != 0) + { + if (abs(horodatage_initial - horodatage()) > 500) + { + return(-1); + } + } } if ((clef = ftok(nom, 1)) == -1) @@ -2094,13 +2244,13 @@ queue_in(pid_t pid, int signal) if (signal == SIGSTART) { while((identifiant = shmget(clef, - nombre_queues * (longueur_queue + 4) * sizeof(int), + nombre_queues * ((2 * longueur_queue) + 4) * sizeof(int), S_IRUSR | S_IWUSR)) == -1); } else { if ((identifiant = shmget(clef, - nombre_queues * (longueur_queue + 4) * sizeof(int), + nombre_queues * ((2 * longueur_queue) + 4) * sizeof(int), S_IRUSR | S_IWUSR)) == -1) { return(-1); @@ -2114,24 +2264,36 @@ queue_in(pid_t pid, int signal) return(-1); } +# endif + if ((nom = nom_semaphore(pid, queue)) == NULL) { +# ifdef IPCS_SYSV shmdt(projection_fifos); +# else + munmap(projection_fifos, nombre_queues * ((2 * longueur_queue) + 4) + * sizeof(int)); +# endif return(-1); } while((semaphore = sem_open(nom, 0)) == SEM_FAILED); + free(nom); - if (sem_wait(semaphore) != 0) + while(sem_wait(semaphore) != 0) { - shmdt(projection_fifos); - return(-1); + if (errno != EINTR) + { +# ifdef IPCS_SYSV + shmdt(projection_fifos); +# else + munmap(projection_fifos, nombre_queues * ((2 * longueur_queue) + 4) + * sizeof(int)); +# endif + return(-1); + } } - // Il ne faut pas empiler plusieurs SIGSTART car SIGSTART peut provenir - // de l'instruction SWI. Plusieurs threads peuvent interrompre de façon - // asynchrone le processus père durant une phase de signaux masqués. - base = &(projection_fifos[(longueur_queue + 4) * queue]); buffer = &(base[4]); @@ -2141,19 +2303,31 @@ queue_in(pid_t pid, int signal) { sem_post(semaphore); sem_close(semaphore); +# ifdef IPCS_SYSV shmdt(projection_fifos); +# else + munmap(projection_fifos, nombre_queues * ((2 * longueur_queue) + 4) + * sizeof(int)); +# endif return(-1); } base[3]--; // base[1] contient le prochain élément à écrire + + buffer[base[1] + (nombre_queues * base[2])] = horodatage(); buffer[base[1]++] = (int) pid; base[1] %= base[2]; if (sem_post(semaphore) != 0) { +# ifdef IPCS_SYSV shmdt(projection_fifos); +# else + munmap(projection_fifos, nombre_queues * ((2 * longueur_queue) + 4) + * sizeof(int)); +# endif sem_close(semaphore); return(-1); } @@ -2161,18 +2335,36 @@ queue_in(pid_t pid, int signal) sem_close(semaphore); // Fermeture des projections +# ifdef IPCS_SYSV shmdt(projection_fifos); - +# else + munmap(projection_fifos, nombre_queues * ((2 * longueur_queue) + 4) + * sizeof(int)); # endif return(0); } +inline int +chaine_markov(int markov, int delta) +{ + double memoire = 0.9; + int valeur; + + valeur = (int) ((memoire * markov) + ((1 - memoire) * delta)); + valeur = (valeur < 10) ? 10 : valeur; + + return(valeur); +} + pid_t origine_signal(int signal) { + logical1 drapeau; + int *base; int *buffer; + int delta; int pid; int queue; @@ -2181,38 +2373,65 @@ origine_signal(int signal) BUG(queue == -1, uprintf("[%d] Unknown signal %d in this context\n", (int) getpid(), signal)); - if (sem_wait(semaphores[queue]) != 0) + while(sem_wait(semaphores[queue]) != 0) { - return(-1); + if (errno != EINTR) + { + return(-1); + } } - // Le signal SIGCONT peut être envoyé de façon totalement asynchrone. - // Il peut y avoir plus de signaux envoyés que d'interruptions traitées. - // Il convient donc de rectifier la queue lors du traitement de - // l'interruption correspondante. Le gestionnaire étant installé sans - // l'option NODEFER, la queue reste cohérente. - - if (signal == SIGCONT) - { - base = &(fifos[(longueur_queue + 4) * queue]); - buffer = &(base[4]); - base[0] = (base[1] - 1) % base[2]; - pid = buffer[base[0]++]; - base[3] = base[2]; - } - else + // On retire les interruptions anciennes qui ont été ratées sauf s'il + // s'agit de la dernière dans la queue. + + base = &(fifos[(longueur_queue + 4) * queue]); + buffer = &(base[4]); + + if (base[3] == (base[2] - 1)) { - base = &(fifos[(longueur_queue + 4) * queue]); - buffer = &(base[4]); + delta = abs(horodatage() - + buffer[base[0] + (nombre_queues * base[2])]); + // Une seule interruption dans la queue. pid = buffer[base[0]++]; base[0] %= base[2]; base[3]++; + + markov = chaine_markov(markov, delta); } + else if (base[3] >= base[2]) + { + // Aucune interruption n'est dans la queue. + // On a retiré trop d'interruptions de la queue. - if (base[3] > base[2]) + // (base[3] - base[2]) + 1 : nombre d'interruptions manquantes + // base[0] - 1 : dernière interruption lue + pid = buffer[((((base[0] + base[2] - 1) % base[2]) + - ((base[3] - base[2]) + 1)) + base[2]) % base[2]]; + } + else { - sem_post(semaphores[queue]); - return(-1); + // Plusieurs interruptions à distribuer. + drapeau = d_vrai; + + do + { + delta = abs(horodatage() - + buffer[base[0] + (nombre_queues * base[2])]); + pid = buffer[base[0]++]; + base[0] %= base[2]; + base[3]++; + + if ((delta > (2 * markov)) && (base[3] < base[2])) + { + drapeau = d_vrai; + } + else + { + drapeau = d_faux; + } + } while(drapeau == d_vrai); + + markov = chaine_markov(markov, delta); } if (sem_post(semaphores[queue]) != 0) @@ -2981,17 +3200,15 @@ kill_broken_siginfo(pid_t pid, int signa return(-1); } - if ((semaphore = sem_open(nom, 0)) == SEM_FAILED) - { - free(nom); - return(-1); - } - + while((semaphore = sem_open(nom, 0)) == SEM_FAILED); free(nom); - if (sem_wait(semaphore) == -1) + while(sem_wait(semaphore) != 0) { - return(-1); + if (errno != EINTR) + { + return(-1); + } } if ((signal != 0) && (signal != SIGINT)) @@ -3028,17 +3245,15 @@ pthread_kill_broken_siginfo(pthread_t ti return(-1); } - if ((semaphore = sem_open(nom, 0)) == SEM_FAILED) - { - free(nom); - return(-1); - } - + while((semaphore = sem_open(nom, 0)) == SEM_FAILED); free(nom); - if (sem_wait(semaphore) == -1) + while(sem_wait(semaphore) != 0) { - return(-1); + if (errno != EINTR) + { + return(-1); + } } if ((signal != 0) && (signal != SIGINT))