Назад: FFI Выше: Содержание Вперед: Ввод/Вывод

Параллелизм

Для обеспечения параллелизма Rubinius поддерживает те же базовые конструкции, что и MRI Ruby: Treads и Fibers. Создан также и новый API, для реализации акторов: Actors. Акторы обеспечивают параллелизм без участия мутексов и блокировок в передаче статуса между тредами.

Акторы выполняются параллельно, но не имеют общего статуса. Вместо этого они передают другим акторам сообщения. В качестве примера создадим два актора при помощи Actor.spawn: ping и pong. Пусть они обмениваются сообщениями до тех пор, пока совместно не произведут инкремент переменной до 1000:

require 'actor'
pong = nil
ping = Actor.spawn do
  loop do
    count = Actor.receive
    break puts(count) if count > 1000
    pong << (count + 1)
  end
end
pong = Actor.spawn do
  loop do
    count = Actor.receive
    break puts(count) if count > 1000
    ping << (count + 1)
  end
end
ping << 1
sleep 1

Обратите внимание: акторы принимают сообщения посредством Actor.receive. Этот вызов блокирует актор до тех пор, пока сообщение не придет в его <<почтовый ящик>>. Передавая Actor.receive блок и используя фильтрацию сообщений, можно учитывать в алгоритме тип сообщения:

Actor.receive do |filter|
  filter.when(Ready) do |who|
    # SNIP
  end
  filter.when(Work) do |work|
    ready_workers.pop << work
  end
  filter.when(Actor::DeadActorError) do |exit|
    print "Actor exited with message: #{exit.reason}\n"
    ready_workers << Actor.spawn_link(&work_loop)
  end
end

Фильтры применяют к сообщениям метод ===, поэтому when() можно передавать, например, регулярные выражения, классы или процедуры --- в зависимости от конкретной ситуации.

Акторы могут приобретать отношения <<предок-потомок>> посредством Actor.spawn_link. Если потомок по какой-то причине <<умирает>>, актор-родитель может получить об этом извещение, если перед запуском потомка переменная Actor.trap_exit была установлена в true. Создадим актор-супервизор, управляющий некоторой очередью при помощи 10 акторов-работников, выполняющих непосредственный процессинг очереди. Если работник умирает (т.е. инициирует исключение), супервизор получает сообщение Actor::DeadActorError.

require 'actor'

Ready = Struct.new(:this)
Work = Struct.new(:msg)

@supervisor = Actor.spawn do
  supervisor = Actor.current
  work_loop = Proc.new do
    loop do
      work = Actor.receive
      puts("Processing: #{work.msg}")
      supervisor << Ready[Actor.current]
    end
  end

  Actor.trap_exit = true
  ready_workers = []
  10.times do |x|
    # start 10 worker actors
    ready_workers << Actor.spawn_link(&work_loop)
  end
  loop do
    Actor.receive do |f|
      f.when(Ready) do |who|
        # SNIP
      end
      f.when(Work) do |work|
        ready_workers.pop << work
      end
      f.when(Actor::DeadActorError) do |exit|
        print "Actor exited with message: #{exit.reason}\n"
        ready_workers << Actor.spawn_link(&work_loop)
      end
    end
  end
end

10.times do |idx|
  @supervisor << Work[idx]
end
sleep 1

Приведенный пример --- фрагмент кода из <>. Если у Вас возникли вопросы, поизучайте этот проект.

Назад: FFI Выше: Содержание Вперед: Ввод/Вывод

Tweet at @rubinius on Twitter or email community@rubini.us. Please report Rubinius issues to our issue tracker.