Ir para o conteúdo

Rubys Concorrentes com Threads

Uma thread é uma forma de dividir um processo em diversas tarefas que podem ser executadas simultaneamente. O suporte de thread é dado pelo sistema operativo, conhecido por kernel-level thread ou implementada por uma biblioteca de uma determinada linguagem, sendo neste caso user-level Thread. Uma thread permite que o utilizador do programa utilize determinada funcionalidade do ambiente enquanto outras threads processão outras operações. Em sistemas com um único CPU, cada thread é processada aparentemente em simultâneo, a mudança entre cada thread é feita de forma muito rápida dando a ideia de que o processamento é paralelo. Em sistemas com múltiplos CPUs ou multi-core as threads podem ser executadas realmente de forma paralela.

Os exemplos apresentados irão utilizar a API Thread da linguagem Ruby. Esta API não utiliza threads nativas, ou seja, kernel-level thread, mas sim user-level thread. Desta forma é sacrificada a eficiência oferecida pelas threads nativas ao sistema operativo, mas ganha em portabilidade, dados que as threads escritas num sistema operativo irão funcionar em qualquer outra que suporte a linguagem Ruby. Versões futuras como Ruby 2.0 poderão vir a suportar threads nativas, dando ao programador mais possibilidades de desenvolvimento.

Criar e Executar Threads

Em Ruby as threads podem ser criadas como qualquer outro objecto, utilizando o método new. Ao chamar este método devemos passar à thread um bloco com o código a ser executado por ela. Vejamos um exemplo muito simples da utilização de threads em Ruby.

palavras = ["Um","Dois","Tres","Quatro"]

numeros = [1,2,3,4]

puts "Sem Threads...."

palavras.each { |palavra| puts(palavra) }

numeros.each { |numero| puts(numero) }


puts "Com Threads...."

Thread.new {
  palavras.each { |palavra| puts(palavra + " - ") }
}

Thread.new{
  numeros.each { |numero| puts(numero) }
}

sleep(5)

output

Sem Threads....
Um
Dois
Tres
Quatro
1
2
3
4
Com Threads....
Um - 1
Dois - 2
Tres - 3
Quatro – 4

A diferença é bem visível. Com apenas a thread principal, primeiro são processadas as palavras e em seguida os números, por sua vez com várias threads em simultâneo podemos ver que tanto as palavras como os números são processados em simultâneo, embora não seja um simultâneo real como vimos anteriormente. O comando sleep(5) no final faz com que o nosso programa espere 5 segundos, este comando é utilizado para que seja possível ver os resultados do programa com threads. Mais à frente veremos como resolver este pequeno problema sem ter de recorrer a delays.

A Main Thread

Mesmo que não criemos threads explicitamente, existe sempre pelo menos uma thread em execução. Esta thread é chamada de main thread. É nesta thread que o programa está a correr. Podemos verificar isso através do seguinte código:

p(Thread.main)

output

#<Thread:0xb7ddc1bc run>

É retornado o ID em hexadecimal e o seu estado run, ou seja, a informação sobre a thread principal que arranca quando um interpretador Ruby começa a execução.

Estados de uma Thread

Cada thread pode encontrar-se em um dos seguintes estados

  • run - A thread está a ser executada.
  • sleep - A thread está em espera ou sleep.
  • aborting - A thread está a ser abortada.
  • false - A thread terminou normalmente.
  • nil - A thread terminou com uma excepção.

Podemos obter o estado de uma thread usando o método status. Ao pedido do estado de uma thread podemos ainda obter dead no caso dessa thread já não existir. Vejamos um simples exemplo que permite ilustrar todos os estados da thread.

puts(Thread.main.inspect)

puts(Thread.new{ sleep }.kill.inspect)

puts(Thread.new{ sleep }.inspect)

t1 = Thread.new{ }

puts(t1.status)

t2 = Thread.new{ raise("Exception") }

puts(t2.status)

output

#<Thread:0xb7dc41ac run>
#<Thread:0xb7db66c4 dead>
#<Thread:0xb7db6660 sleep>
false
nil

Obter o estado de uma thread é uma operação bastante simples como se pode ser no exemplo acima.

Garantir que a Thread é Executada

Voltamos agora ao problema mostrado no nosso primeiro exemplo em que o programa terminava antes que as threads fossem executadas, nesse exemplo resolvemos o problema com a inserção de um sleep no nosso programa, mas inserir delays na aplicação não é de forma alguma a melhor solução. Para isso usamos o método join, este método obriga a que a thread invocadora espere que a nova thread termine e só então esta continua a execução. Modificando então o nosso exemplo, obtemos o seguinte exemplo.

palavras = ["Um","Dois","Tres","Quatro"]

numeros = [1,2,3,4]

t1 = Thread.new {
  palavras.each { |palavra| puts(palavra + " - ") }
}

t2 = Thread.new{
  numeros.each { |numero| puts(numero) }
}

t1.join()
t2.join()

output

Um - 1
Dois - 2
Tres - 3
Quatro - 4

Como podemos ver o resultado é precisamente o pretendido e desta vez sem delays artificiais e sem o tempo de execução negligenciado. O método join pode ainda ter como argumento um inteiro que define o tempo em segundos pelo qual a thread chamadora deve esperar no máximo pela thread invocada. Basicamente, é um timeout para o join.

Prioridades das Threads

Até agora demos ao Ruby total liberdade de gerir o tempo passado em cada uma das threads. Mas em determinadas situações certas threads são mais importantes que outras. Por exemplo, temos uma thread que guarda uma determinada quantidade de dados num ficheiro e outras que mostra o progresso da gravação, faz sentido que a thread responsável pela escrita no ficheiro tenha disponível mais tempo que a thread que apenas mostra o progresso da gravação. Para isso o Ruby permite a utilização de inteiros para indicar a prioridade de cada thread. Em teoria threads com prioridade mais alta tem mais tempo de execução que threads com prioridade mais baixa. Na prática não é tão linear, devido a outros factores como, por exemplo, o número de threads que estão a correr. Uma vez que as prioridades em pequenos programas são praticamente impossíveis de visualizar, vamos utilizar a uma função factorial que será chamada 100 vezes por cada um das threads.

def fac(n)
  n == 1 ? 1 : n * fac(n - 1)
end

t1 = Thread.new{ 
  0.upto(100) {
    fac(50)
    puts("t1\n")
  }
}

t2 = Thread.new{
  0.upto(100) {
    fac(50)
    puts("t2\n")
  }
}

t3 = Thread.new{
  0.upto(100) {
    fac(50)
    puts("t3\n")
  }
}

t1.priority=0
t2.priority=0
t3.priority=1

t1.join()
t2.join()
t3.join()

Como podemos ver são criadas três threads, ambas com a mesma funcionalidade, apenas com identificadores diferentes. Se colocarmos todas as threads com a mesma prioridade o output será o seguinte.

output

t1
t2
t3
t1
t2
t3
t1
t2
t3

Ou seja, thread 1 seguida pela thread 2 e por fim a thread 3, a sequência repete-se até ao final. Por outro lado, se, por exemplo, dermos prioridade 1 à thread t3, e 0 à t1 e t2, como mostrado no código acima, o resultado será o seguinte.

output

t1
t2
t3
t1
t3
t3
…
t3
t2
t1
t2
t1
t2

Ou seja, a thread 3 passa a ter mais tempo de execução que as threads 1 e 2. Assim sendo, é executada e termina primeiro que as restantes, e em seguida as thread 1 e 2 correm em paralelo dado terem a mesma prioridade. Também podem ser utilizados número negativos para definir a prioridade das threads, por vezes pode até ser preferível o uso deste número devido à thread main que tem prioridade 0, como vamos ver em seguida.

Prioridade da Main Thread

Como qualquer thread, também a main thread tem um grau de prioridade que por defeito é 0. Sendo assim ao atribuir valores positivos para as restantes threads estamos a dar maior prioridade a estas threads em relação à main. Ao usar valores negativos para as outras threads estamos a garantir que a main thread estará sempre a um nível superior em relação a todas as outras threads, garantindo a coerência de execução. No caso de se preferir usar números positivos, podemos definir a prioridade da main thread com uma número elevado que não será superado pelas restantes, por exemplo, 100.

Thread.main.priority=100

Criar mas não executar uma Thread

Todos os exemplos que vimos até agora ao criar uma thread ela automaticamente começa a sua execução, aliás no exemplo anterior com prioridades podemos ver que antes das prioridades serem definidas as thread são executadas pela ordem que são criadas e só em seguida passam a ser executadas pelas prioridades que lhes são dadas, para resolver este pequeno problema basta usarmos o método stop no início da definição da nossa thread.

t1 = Thread.new{
  Thread.stop
  0.upto(100) { |i|
    puts i
  }
}

t1.run
t1.join()

output

0
1
...
99
100

Desta forma a thread t1 apenas é executada após ser chamado o método run da thread. Caso este método não seja chamado, a thread não entra em execução, embora esteja criada. Se verificarmos o seu estado antes de chamar o método run, podemos ver que a thread se encontra no estado sleep.

Mutexes

Em alguns casos é necessário que duas ou mais threads utilizem o mesmo recurso global, por exemplo, uma variável global. Nesta situação podemos obter resultados imprevisíveis, uma vez que pode acontecer uma thread alterar esse recurso no mesmo instante que outra thread o utiliza, fazendo com que esta use um recurso obsoleto. Vejamos o exemplo seguinte do que pode acontecer.

$i = 0

a = Thread.new { 100000.times { $i += 1 } }
b = Thread.new{ 100000.times{ $i += 1 } }

a.join
b.join

puts($i)

O que o nosso programa faz é simplesmente usar duas threads para incrementar uma unidade à variável global i 100000x em cada thread. O esperado seria termos o resultado de 200000, mas como podemos ver não é bem isso que obtemos, mas sim o valor de 109589. A razão pela qual isto acontece é porque ambas as threads estão a utilizar o mesmo recurso e uma vez que ambas são executadas praticamente em paralelo em determinados pontos, estas threads vão utilizar um recurso que na realidade já não existe, ou seja, a thread a incrementa a variável, mas nesse mesmo instante a thread b também o faz, uma vez que não existe controlo de acesso à variável. Vejamos um exemplo simples. A variável i tem valor 1000. A thread a vai buscar esse valor e incremente passando a 1001. Ao mesmo tempo, a thread b também vai buscar o valor e também o incrementa para 1001, perdendo-se então 1 unidade.

Para resolver este problema temos de garantir que uma thread só terá acesso ao recurso depois que este tenha sido libertado por outra thread que o tenha utilizado. Neste sentido, o Ruby fornece a classe Mutex e através do método synchronize podemos garantir isso mesmo. Vejamos então o exemplo:

require "thread"

$i = 0;

semaphore = Mutex.new

a = Thread.new{
  semaphore.synchronize {
    100000.times { $i += 1 }
  }
}

b = Thread.new{
  semaphore.synchronize {
     100000.times { $i += 1 }
  }
}

a.join
b.join

puts($i)

E como esperado o valor obtido é 200000. Isto deve-se ao método synchronize, que bloqueia os recursos globais usados dentro do seu bloco e apenas os liberta após o bloco ter terminado.

Passagem de Argumentos à Thread

O método new permite ainda a passagem de argumentos para a thread, tornando assim possível enviar dados para serem processados na thread sem ter de recorrer a recursos globais, como por exemplo variáveis globais.

t1 = Thread.new("magician") { |arg|
  Thread.stop
  puts(arg)
}

puts("Hello")

t1.run
t1.join

Como podemos ver, enviámos uma string que depois é processada pelo método puts que imprime a string. E por sua vez, a main thread imprime a string Hello obtendo assim o output seguinte.

output

Hello 
magician

Retornar Valores de uma Thread

É também possível retornar valores de uma thread. Para isso utilizamos o método value. Este método retorna o valor final após a execução do bloco da thread. Caso nenhum valor tenha sido obtido, o resultado obtido por este método será nil. Vamos então ver alguns casos de como podemos obter estes valores.

t1 = Thread.new{
  puts "T1"
}

t2 = Thread.new{
  5 + 5
}

t3 = Thread.new{
  "X"
}

t4 = Thread.new{
  4 + 4
  "Y"
}

t5 = Thread.new{
  val = 1 + 1
  "T5"
  val
}

t6 = Thread.new{
  2 + 2
  "T"
  puts "T6"
}

puts("Valor = " + (t1.value == nil ? "nil" : t1.value.to_s))
puts("Valor = " + t2.value.to_s)
puts("Valor = " + t3.value)
puts("Valor = " + t4.value)
puts("Valor = " + t5.value.to_s)
puts("Valor = " + (t6.value == nil ? "nil" : t6.value.to_s))

output

T1
T6
Valor = nil
Valor = 10
Valor = X
Valor = Y
Valor = 2
Valor = nil

Como é possível ver a thread t1 não retorna qualquer valor. Como tal o valor resultante é nil. Por sua vez, a thread t2 retorna um inteiro com o resultado de 5 + 5. Da mesma forma que a thread t3 retorna uma string com X. As threads 4, 5 e 6 mostram que o valor retornado é sempre o último a ser processado no bloco, embora não seja colocada a palavra return o Ruby assume que a última instrução contém o valor a retornar como podemos ver na thread t6.

Matar uma Thread

Por vezes pode ser necessário terminar a thread antes que ela termine a execução do seu bloco de código. Isto pode acontecer, por exemplo, quando queremos cancelar uma operação que por algum motivo já não queremos que seja executadas, mas em que a sua execução já foi iniciada. Vejamos o exemplo seguinte em que uma thread imprime a cada segundo a string T1 indefinidamente e após esperar 10 segundos o nosso programa força a nossa thread a terminar.

t1 = Thread.new{
  while true
    puts("T1")
    sleep(1)
  end
}


sleep(10)

t1.terminate
puts(t1.alive?)

output

t1
t1
...
t1
t1
false

O método kill obriga a que a thread termine e retorna-a. Em seguida passa a execução para outra thread. Caso não existam mais threads a executar ou a thread terminada seja a main thread, então é processado o encerramento do programa. Para além do método kill, podemos ainda usar os métodos exit ou terminate. Qualquer um deles realiza a mesma operação.

Passar a Execução para a proxima Thread

Em determinadas situações podemos querer que certa thread que esteja em execução, passe a execução para outras threads. Por exemplo, queremos que após a thread X executar as três primeiras operações ela passe a execução para a thread Y para que esta execute o seu bloco de execução. Para isso podemos utilizar o método pass da classe Thread. Este método faz isso mesmo, invoca o “calendário” de threads passando a execução à próxima thread. Vejamos um exemplo que ilustra o uso deste método.

s = ""

t1 = Thread.new{
  s << "a"
  Thread.pass
  s << "b"
  Thread.pass
  s << "c"
}

t2 = Thread.new{
  s << "x"
  Thread.pass
  s << "y"
  Thread.pass
  s << "z"
}

t1.join
t2.join

puts(s)

Neste caso são iniciadas duas threads. A primeira t1, concatena a string a à string na variável s e passa a execução à thread t2, que concatena também a string e passa a execução novamente para a thread t1 e por aí fora até ambas terminarem os seus blocos de execução, resultando no seguinte output:

output

axbycz

Caso não fosse usado o método pass o resultado seria diferente. A thread t1 iria executar todo o seu bloco de código dado ser pequeno, e em seguida seria a thread t2 a fazê-lo. Nesta situação o output obtido seria o seguinte:

output

abcxyz

Variáveis Locais da Thread

É ainda possível criar e obter variáveis locais numa thread. Podemos criá-las dentro da thread ou fora dela, bem como ter acesso a elas. O processo é muito semelhante a usar uma hash. Passemos a um exemplo que ilustra o processo.

t1 = Thread.new{
  Thread.current[:nome] = "magician"
  Thread.current[:lvl] = 3
}

puts("LVL ? " + t1.key?(:lvl).to_s)
puts("Nome = " + t1[:nome])

t1[:lang] = "ruby"

puts(t1.keys)
t1.join

output

LVL ? true
Nome = magician
lang
lvl
nome

Pelo exemplo podemos ver que é extremamente simples a utilização deste recurso. Caso estejamos dentro da thread, basta usar o current e trabalhar de igual forma como se fosse uma hash. Caso estejamos fora da thread basta usar a variável que detém o objecto Thread como se fosse uma hash. Isto permite, por exemplo, a passagem de valores para dentro da thread caso esta já tenha sido iniciada pelo método new, ou caso queiramos guardar vários valores que podem depois ser consultados no exterior da thread.

Subclasses da Classe Thread

Para além da forma mais comum de criar threads que é utilizando o método new, é possível, implementar threads criando subclasses da mesma. Embora esta forma seja mais comum em linguagens, por exemplo, como Java e C#, é também possível fazê-lo em Ruby. Vamos então ver um exemplo simples de uma subclasse de Thread.

class MinhaThread < Thread

  def initialize(*args)
    super{
      Thread.stop
      args.each { |arg|
        print(arg+"\n")
      }
    }
  end

  def run
    print("Thread a iniciar...\n")
    super
  end

end


m = MinhaThread.new("white", "magician", "P@P")
m.run
m.join

output

Thread a iniciar...
white
magician
P@P

Embora não seja tão trivial como os exemplos dados anteriormente, este é um exemplo simples de uma subclasse da classe Thread em que reescrevemos os métodos run e initialize de forma a que façam mais algumas acções. Poderíamos também reescrever outros métodos, como por exemplo o exit, fazendo com que para além de terminar a thread realizasse outra operação que estaria implícita ao terminar da thread.

ThreadGroups

Para finalizar vamos falar de ThreadGroups. No fundo, um ThreadGroup é apenas um conjunto de threads. Imaginemos o ThreadGroup como um array que contem n Threads. Uma Thread apenas pode pertencer a um grupo. Ao ser adicionada ao grupo x, caso esteja no grupo y, ela é removida do grupo y no instante em que é adicionada ao grupo x. No momento em que é criada, uma Thread pertence ao mesmo grupo a que pertence a Thread que a criou. Threads terminadas ou "mortas" têm ThreadGroup nil, ou seja, não estão em nenhum grupo. Para além de permitir adicionar Threads, um grupo, permite criar uma lista de todas as Threads que pertencem a esse grupo. Para isso basta usar o método list. Permite ainda bloquear o grupo, ou seja, o grupo não deixa serem adicionadas novas Threads, nem que sejam removidas as Threads existentes. Temos agora um pequeno exemplo de como usar ThreadGroups.

tg = ThreadGroup.new

t1 = Thread.new {
  Thread.stop
}

t2 = Thread.new {
  Thread.stop
}


puts(t1.group.to_s + " == " + t2.group.to_s + " == " + Thread.main.group.to_s)

tg.add(t1)
tg.add(t2)

puts(t1.group.to_s + " == " + t2.group.to_s + " != " + Thread.main.group.to_s)

puts("Threads = " + tg.list.to_s)

t3 = Thread.new {

}

puts(t3.group)

tg.enclose

t4 = Thread.new{
  Thread.stop
}

tg.add(t4)

output

#<ThreadGroup:0xb7ca81c4> == #<ThreadGroup:0xb7ca81c4> == #<ThreadGroup:0xb7ca81c4>
#<ThreadGroup:0xb7c99fc0> == #<ThreadGroup:0xb7c99fc0> != #<ThreadGroup:0xb7ca81c4>
Threads = #<Thread:0xb7c99f48>#<Thread:0xb7c99fac>
nil
…`add': can't move to the enclosed thread group (ThreadError)...

Pelo exemplo podemos ver que as Threads ao serem criadas ficam com o mesmo grupo que a Thread que as criou, neste caso a aain thread. Depois de serem adicionadas a outro grupo, deixam de pertencer ao grupo onde se encontra a main thread, e passam a pertencer ao novo grupo. Por sua vez as threads terminadas tem grupo nil, como é o caso da thread t3. Em seguida podemos ver que ao bloquear o grupo com o enclose, e ao tentarmos adicionar uma nova Thread a esse grupo, é lançado um ThreadError, dado que não é possível adicionar ou remover Threads deste grupo. É de sublinhar que um grupo que seja definido como enclose irá permanecer neste estado, uma vez que não é actualmente possível inverter o processo de bloqueio.

Conclusão

Ao longo deste artigo foram exploradas diversas vertentes das threads em Ruby. Conteúdo, como a sua criação, controlo, processamento, finalização e até organização foram explicados e exemplificados de forma simples. É possível agora criar threads bastante complexas e funcionais a partir dos conhecimentos básicos aqui explicados. Como tudo em programação, também as threads devem ser usadas com ponderação e controlo, e não usá-las por tudo e por nada, ou então apenas não as usar.

Referências