Concorrência


Playlists

Estas notas de aula são baseadas no livro “Parallel and Concurrent Programming in Haskell”, por Simon Marlow: https://simonmar.github.io/pages/pcph.html

1 Haskell Concorrente

  • O objetivo da programação concorrente é definir explicitamente múltiplos caminhos de controle, geralmente para permitir múltiplas interações com usuário ou interfaces externas.

  • Não necessariamente serão executadas em paralelo.

  • A ideia é descrever cada interação em separado, mas fazer com que elas ocorram ao mesmo tempo (intercaladamente).

  • Considere um Web Browser que permite carregar múltiplas páginas ao mesmo tempo.

  • Além disso, enquanto ele carrega uma ou mais páginas, a interface ainda interage com o usuário (apertar o botão cancelar, etc.)

  • Um servidor Web também implementa concorrência para servir múltiplos pedidos ao mesmo tempo.

  • Imagine a situação de um servidor Web atendendo um pedido de página por vez.

  • O Haskell fornece um conjunto de funcionalidades simples mas genéricas que podem ser utilizadas para definir estruturas mais complexas.

  • Com isso fica a cargo do programador definir o melhor modelo de programação concorrente.

2 Conceitos Básicos - Threads

  • Uma thread é a menor sequência de computação que pode ser gerenciada de forma independente pelo gerenciador de tarefas.

  • Um processo é um procedimento computacional completo que pode conter diversas threads de execução.

  • Múltiplas threads de um mesmo processo compartilham a memória alocada, podendo utilizá-la para troca de mensagens.

  • Múltiplos processos não compartilham memória alocada.

No Haskell criamos uma thread com a função forkIO:

import Control.Concurrent

forkIO :: IO () -> IO ThreadId

Ela recebe uma ação computacional de IO como argumento e retorna um identificador dessa thread.

A ideia é que todo efeito colateral feito pela ação IO será feito de forma concorrente com outras threads:

import Control.Concurrent
import Control.Monad
import System.IO

main = do
  hSetBuffering stdout NoBuffering
  forkIO (replicateM_ 100000 (putChar 'A'))
  replicateM_ 100000 (putChar 'B')

A primeira linha da função main desativa o buffer para executar toda ação IO no momento exato em que ela é enviada.

A segunda linha cria uma thread que imprimirá o caractere A dez mil vezes.

A terceira linha imprime o caractere B dez mil vezes.

A execução do programa resultará em caracteres A e B intercalados:

AAAAAAAAABABABABABABABABABABABAABABABABAB ...

2.1 Experimente no Repl.it

Todo o código dessa seção pode ser utilizado no Repl.it abaixo:

Código no Repl.it




3 Exemplo: Lembretes

  • Vamos criar uma função multithread que aguarda o usuário entrar com um tempo em segundos e cria uma thread para imprimir uma mensagem após esse número de segundos tenha passado:
import Control.Concurrent
import Control.Monad

main = forever $ do
           s <- getLine
           forkIO $ setReminder s
  • A função forever está definida na biblioteca Control.Monad e simplesmente repete a ação eternamente. A função setReminder pode ser definida como:
setReminder :: String -> IO ()
setReminder s = do
  let t = read s :: Int
  putStrLn ("Ok, agendado para " ++ show t ++ " segs.")
  threadDelay (10^6 * t)
  putStrLn (show t ++  " segundos se passaram! BING!"
  • A função threadDelay suspende a execução da thread por \(n\) microsegundos.

Exemplo de execução:

$ ./lembrete
2
Ok, agendado para 2 segs.
4
Ok, agendado para 4 segs.
2 segundos se passaram! BING!
4 segundos se passaram! BING!

3.1 Exercício 1

  • Altere o programa anterior para terminar o programa assim que o usuário digitar quit.

DICA: troque o uso de forever por uma função definida por você denominada repita.

3.2 Exercício 1 - Resposta

main = repita
 where repita = do
   s <- getLine
   if s == "quit"
   then return ()
   else do forkIO $ setReminder s
   repita

3.3 Experimente no Repl.it

Experimente o código dessa seção no Repl.it abaixo:

O Repl.it tem um bug para o tratamento do stdin, stdout e uso do `getLine`:

Right now, the Haskell repl has few limitations. In the repl, stdout won’t be echoed until your program completes execution, and stdin is completely ignored. So if you want to use something like getLine, it’s probably best to put that in the editor. Aside from that, it should behave eactly like ghci.

Fonte: https://blog.repl.it/haskell

Isso causa um problema na saída do programa abaixo: letras ficam embaralhadas e saídas duplicadas. O código, contudo, está correto e funcionará sem problemas em um terminal “de verdade”.

Código no Repl.it




4 Comunicação entre threads: MVar

Para que as threads possam se comunicar é necessário a existência de um espaço de memória compartilhada.

No Haskell isso é implementado através do tipo MVar:

data MVar a

newEmptyMVar :: IO (MVar a)
newMVar      :: a -> IO (MVar a)
takeMVar     :: MVar a -> IO a
putMVar      :: MVar a -> a -> IO ()
  • O tipo MVar é um container para qualquer tipo de dado. Você pode armazenar um valor Integer, uma String, uma lista de Bool, etc.

  • A função newEmptyMVar cria um MVar inicialmente vazio. A função newMVar recebe um argumento x e retorna um MVar contendo x.

  • As funções takeMVar e putMVar, inserem e removem um conteúdo em um MVar.

Notem que MVar armazena apenas um único valor em sua estrutura:

  • Se uma thread chamar takeMVar e ela estiver vazia, ficará bloqueada em espera até que algum conteúdo seja inserido.
  • Se uma thread chamar putMVar e ela estiver cheia, ficará bloqueada em espera até que alguema thread utilize takeMVar.

Considere o seguinte código exemplo:

main = do
  m <- newEmptyMVar
  forkIO $ do putMVar m 'x'; putMVar m 'y'
  r1 <- takeMVar m
  print r1
  r2 <- takeMVar m
  print r2

Inicialmente, cria-se uma MVar vazia:

Em seguida, criamos uma thread que armazena dois caracteres na sequência, ao inserir o primeiro caractere a thread fica bloqueada aguardando espaço ser liberado.

Nesse momento o thread principal recupera o valor de MVar e armazena em r1, liberando espaço para a thread armazenar o segundo caractere.

Ao final, o segundo caractere é recuperado e MVar se torna vazio.

4.1 Mecanismo anti-burrice

E se o programador escrever o seguinte programa?

main = do
  m <- newEmptyMVar
  takeMVar m

\tiny Imagem de JacLou DL por Pixabay

A execução retornará:

$ ./burro
burro: thread blocked indefinitely in an MVar operation
  • Se você criar um programa em que a thread fica bloqueada eternamente, em muitos casos o programa emite uma exceção BlockedIndefinitelyOnMVar.

4.2 Experimente no Repl.it

Experimente o código dessa seção no Repl.it abaixo:

Código no Repl.it




5 Construindo tipos mutáveis com MVar

Como sabemos, os tipos do Haskell são imutáveis, ou seja, uma vez que definimos uma variável ela não pode mudar de valor.

O tipo MVar pode nos ajudar a simular um tipo mutável a partir de um tipo imutável de forma transparente!

A biblioteca Data.Map fornece o tipo mapa associativo:

data Map k a

import qualified Data.Map as M

que define um mapa associativo com chave do tipo k e valores do tipo a.

Essa biblioteca possui as funções:

M.empty  :: Map k a

M.insert :: Ord k => k -> a -> Map k a -> Map k a
M.insert k v m = -- insere valor v na chave k do mapa m,
                 -- substituindo caso já exista

M.lookup :: Ord k => k -> Map k a -> Maybe a
M.lookup k m = -- retorna o valor na chave k do mapa m,
             -- retorna Nothing caso não exista a chave

5.1 Agenda telefônica


**Agenda telefônica**

Vamos criar o tipo agenda telefônica:

type Nome   = String
type Numero = String
type Agenda = M.Map Nome Numero

Mas uma agenda telefônica não pode ser uma estrutura imutável. Preciso ter a capacidade de inserir novos nomes e atualizar as entradas. Vamos definir uma agenda telefônica mutável como:

newtype AgendaMut = AgendaMut (MVar Agenda)

5.2 Exercício 2

Defina a função novaAgenda que cria uma agenda mutável vazia.

novaAgenda :: IO AgendaMut

5.3 Resposta - Exercício 2

novaAgenda :: IO AgendaMut
novaAgenda = do m <- newMVar M.empty
                return (AgendaMut m)

5.4 Exercício 3

Defina agora a função insere que insere um nome e um telefone:

insere :: AgendaMut -> Nome -> Numero -> IO ()
insere (AgendaMut m) nome numero = ??

5.5 Resposta - Exercício 3

insere :: AgendaMut -> Nome -> Numero -> IO ()
insere (AgendaMut m) nome numero = do
  agenda <- takeMVar m
  putMVar m (M.insert nome numero agenda)

5.6 Exercício 4

Defina a função procura que devolve uma entrada da agenda:

procura :: AgendaMut -> Nome -> IO (Maybe Numero)
procura (AgendaMut m) -> Nome = do

5.7 Resposta - Exercício 4

procura :: AgendaMut -> Nome -> IO (Maybe Numero)
procura (AgendaMut m) nome = do
  agenda <- takeMVar m
  putMVar m agenda      -- guarda de volta
  return (M.lookup nome agenda)

5.8 Agenda Mutável

Assim, podemos trabalhar com a agenda da seguinte forma:

nomes = [("Joao", "111-222"), ("Maria", "222-111"),
         ("Marcos", "333-222")]

main = do
  s <- novaAgenda
  mapM_ (uncurry (insere s)) nomes
  print =<< procura s "Marcos"
  print =<< procura s "Ana"

5.9 Exercício 5

  • O operador =<< é igual ao operador >>= mas invertido. Isto é, (<<=) = flip (>>=).

Converta a expressão para a notação do:

print =<< procura s "Marcos"

5.10 Resposta - Exercício 5

do x <- procura s "Marcos"
   print x

5.11 Agenda Mutável

Se essas funções forem utilizadas em um programa que cria múltiplas threads podemos observar algumas vantagens:

  • Durante a operação de busca não é necessário criar um lock no estado da agenda durante a operação.
  • Graças a avaliação preguiçosa, também não se faz necessário um lock por muito tempo no momento da inserção.

5.12 Experimente no Repl.it

Experimente o código dessa seção no Repl.it abaixo:

Código no Repl.it




6 Operações Assíncronas

Imagine a situação em que desejamos capturar o conteúdo de uma lista de páginas da Web. Queremos fazer isso de forma concorrente e, após o encerramento de todas as threads, quero aplicar alguma função nos resultados.

Nosso código seria algo como:

import Control.Concurrent
import Data.ByteString as B
import GetURL

main = do
  m1 <- newEmptyMVar
  m2 <- newEmptyMVar
forkIO $ do
  r <- getURL "http://www.wikipedia.org/wiki/Shovel"
  putMVar m1 r

forkIO $ do
  r <- getURL "http://www.wikipedia.org/wiki/Spade"
  putMVar m2 r
r1 <- takeMVar m1
r2 <- takeMVar m2
print (B.length r1, B.length r2)
  • A ideia é que as duas threads façam o download do conteúdo de cada URL em background assincronamente.

  • Uma operação assíncrona é uma operação que é feita em background enquanto eu posso fazer outras operações que não dependam dela, e permita que eu aguarde o final para realizar alguma operação sobre os resultados.

  • O código está muito repetitivo.

  • E se eu quiser fazer a mesma operação para uma lista de URLs? Vamos generalizar:

data Async a = Async (MVar a)
data MVar a

newEmptyMVar :: IO (MVar a)
newMVar      :: a -> IO (MVar a)
takeMVar     :: MVar a -> IO a
putMVar      :: MVar a -> a -> IO ()


async :: IO a -> IO (Async a)
async action = do
  var <- newEmptyMVar
  forkIO (do r <- action; putMVar var r)
  return (Async var)
wait :: Async a -> IO a
wait (Async var) = readMVar var

Essas funções estão definidas na biblioteca Control.Concurrent.Async

  • A função async cria uma thread a ser executada assíncronamente com outras.

  • A função wait aguarda o final da execução de uma thread assíncrona.

Dessa forma nosso código pode ser reescrito como:

import Control.Concurrent
import Data.ByteString as B
import GetURL

url1 = "http://www.wikipedia.org/wiki/Shovel"
url2 = "http://www.wikipedia.org/wiki/Spade"

main = do
  a1 <- async (getURL url1)
  a2 <- async (getURL url2)
  r1 <- wait a1
  r2 <- wait a2
  print (B.length r1, B.length r2)

Assim, podemos capturar uma lista de sites da seguinte forma:

main = do
  as <- mapM (async . getURL) sites
  rs <- mapM wait as
  mapM_ (r -> print $ B.length r) rs

6.1 Tratamento de Erro

  • E se uma das URLs não existir ou retornar algum erro?
  • Devemos jogar fora todos os resultados e exibir uma mensagem de erro?

A biblioteca async define a função:

waitCatch :: Async a -> IO (Either SomeException a)
  • waitCatch retorna ou um erro, que pode ser tratado, ou o valor esperado. Para tratar o erro devemos importar também import Control.Exception

**`Either`**

O tipo Either é definido como:

data Either a b = Left a | Right b

e assim como o Maybe é utilizado para tratamento de erro. Ele diz: “ou vou retornar algo do tipo a ou do tipo b”, sendo o tipo a geralmente tratado como o erro.

Então podemos definir:

printLen :: (Either SomeException B.ByteString) -> IO ()
printLen (Left  e) = print "URL not found"
printLen (Right b) = print $ B.length b

E então:

-- Saímos de
main = do
  as <- mapM (async . getURL) sites
  rs <- mapM wait as
  mapM_ (r -> print $ B.length r) rs

-- Para
main = do
  as <- mapM (async . getURL) sites
  rs <- mapM waitCatch as
  mapM_ printLen rs

7 Leituras complementares

  • Com isso finalizamos o assunto de Programação Concorrente nessa disciplina, embora não tenhamos esgotado todos os conceitos.

  • Para quem quiser avançar no assunto, a leitura do livro do Simon Marlow é obrigatória!

8 Disclaimer

Estes slides foram preparados para os cursos de Paradigmas de Programação e Desenvolvimento Orientado a Tipos na UFABC.

Este material pode ser usado livremente desde que sejam mantidos, além deste aviso, os créditos aos autores e instituições.