Tecnologias Java
Threads
Marcio Seiji Oyamada msoyamada@gmail.com
Pós-graduação
Conteúdo programático
• Apresentação da plataforma Java • Threads
• Construção de ambientes gráficos • Acesso a base de dados
• Sockets • RMI
• Applets
Threads
• Única linha de execução x múltiplas linhas de execução
Benefícios
• Tempo de resposta
• Compartilhamento de recursos • Economia de recursos
Muitos para um
• Várias threads no nível do usuário, mapeadas para uma única thread no nível do kernel
• Exemplos:
– Solaris Green Threads – GNU Portable Threads
Um para um
• Cada thread no nível do usuário é mapeada para uma thread no nível do kernel
• Exemplos
– Windows NT/XP/2000 – Linux
Threads em Java
• Gerenciada pela JVM
– Mapeamento para o sistema operacional depende da implementação da JVM
– Java Threads
• Classe Thread ou • Interface Runnable
Interface Runnable
• Método necessário para descrever uma thread
public interface java.lang.Runnable {
// Methods
public abstract void run();
}
• Exemplo:
class ThreadInterface implements Runnable{
public void run(){
for (int i=0; i <20;i++){
System.out.println("Thread["+Thread.currentThread(). getName()+"]="+i);
}
}
Classe Thread
• Classe principal que representa uma thread em Java
• Métodos para gerenciar threads
– Obter nome da thread – Alterar a prioridade
– Interromper uma thread – Liberar o processador
Criando uma thread com a classe
Thread
public class ThreadClasse extends Thread {
public ThreadClasse(){
super(); }
public void run(){
for (int i=0; i <20;i++){
System.out.println("Thread["+ Thread.currentThread().getName()+ "]="+i); } } }
Executando threads (Interface
Runnable)
• Utilizando a interface Runnable
public class ExecutaThread {
public static void main(String args[]) throws InterruptedException{
Thread t1; Thread t2;
t1= new Thread(new ThreadInterface()); t2= new Thread(new ThreadInterface());
t1.start(); // inicia a execução da Thread t2.start(); // inicia a execução da Thread System.out.println(“Thread inicializadas”); t1.join(); // Aguarda a thread t1 finalizar t2.join(); // Aguarda a thread t1 finalizar System.out.println(“Thread finalizadas”);
} }
Executando threads (Classe
Thread)
• Utilizando a Classe Thread
public class ExecutaThread {
public static void main(String[] args) throws InterruptedException {
// TODO code application logic here
ClasseThread t1= new ClasseThread(); ClasseThread t2= new ClasseThread();
t1.start(); t2.start(); System.out.println("Thread inicializadas"); t1.join(); t2.join(); System.out.println("Thread finalizadas"); } }
Executors
• Gerencia um conjunto de Threads
– FixedThreadPool: número fixo de threads – CachedThreadPool: aloca dinamicamente
• FixedThreadPool
– Evita o overhead com a criação de thread
– Maior controle dos recursos utilizados durante a execução do sistema
Exemplo Executors (1)
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
// TODO code application logic here ExecutorService executor=
Executors.newCachedThreadPool(); for (int i=0; i < 5; i++){
executor.execute(new ThreadInterface()); } System.out.println("Threads executando"); executor.shutdown(); } }
Exemplo: Executors (2)
public class ThreadInterface implements Runnable{
public void run() {
for (int i=0; i <20;i++){
System.out.println("Thread["+Thread.currentThr ead().getName()+"]="+i);
} }
Executors
• Métodos
– Execute(Interface Runnable): submete uma nova interface para ser executada
– Shutdown(): previne que outras threads sejam submetidas ao ExecutorService
– ShutdownNow(): tentar finalizar a execução das Threads
Interrompendo Threads
public class MainInterruptTest {
public static void main(String[] args) throws InterruptedException { ExecutorService executor= Executors.newCachedThreadPool();
for (int i=0; i < 10; i++)
executor.execute(new MyThread()); System.out.println("Sleeping...."); TimeUnit.SECONDS.sleep(10); executor.shutdownNow(); System.out.println("Shutdown solicitado"); } }
class MyThread implements Runnable{ public void run() {
boolean sair=false; while (!sair){
System.out.println(Thread.currentThread().getName()); if (Thread.currentThread().isInterrupted()){
System.out.println("Interrupção solicitada, finalizando a Thread"+ Thread.currentThread().getName()); sair= true; } } } }
Thread.yield()
• Para a execução da Thread atual e libera o
processador para que uma outra Thread possa executar
– Forçar a troca de contexto e conseqüentemente uma melhor distribuição do processador
public class ThreadInterface implements Runnable{ public void run() {
for (int i=0; i <10;i++){
System.out.println("Thread["+Thread.currentThread(). getName()+"]="+i); Thread.yield(); } } }
Threads:Sleep
public class SleepingTest implements Runnable { public void run() {
try {
for (int i=0; i <10; i++) {
System.out.print(“Sleep …”+Thread.currentThread().getName()) ; // Old-style: // Thread.sleep(100); // Java SE5/6-style: TimeUnit.MILLISECONDS.sleep(100); } } catch(InterruptedException e) { System.err.println("Interrupted"); } }
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool() for (int i = 0; i < 5; i++)
exec.execute(new SleepingTest()); exec.shutdown();
} }
Inicialização de atributos
public class SleepingTest implements Runnable{ boolean setYield= false;
public SleepingTest (boolean _setYield){ this.setYield= _setYield;
}
public void run() {
for (int i=0; i <10;i++){
System.out.println("Thread["+Thread.currentThread().getNam e()+"]="+i); if (setYield) Thread.yield(); } }
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool() exec.execute(new SleepingTest(true)); exec.execute(new SleepingTest(true)); exec.execute(new SleepingTest(false)); exec.execute(new SleepingTest(false)); exec.shutdown(); } }
Exercícios
1) Qual o comportamento na execução das Threads utilizando o método yield()?
2) Faça uma aplicação multithread onde cada Thread escreva na tela N vezes uma String. O valor N e a String serão passados como parâmetro pelo construtor
Threads: Retornando valores
• A interface Runnable não define um método para retornar valores
– Solução 1: utilizar a interface Callable
– Solução 2: retornar os valores em um objeto compartilhado
• Problemas de sincronização em dados compartilhados (veremos mais adiante)
Interface Callable
• Callable: generic possibilitando definir o tipo de retorno
• Método call: ponto de entrada para a Thread (ao invés do método run()), retornando o tipo definido no generic Callable
Exemplo: Callable(1)
public class ThreadCallable implements Callable<int>{
private static Random generator= new Random(); public int call(){
return generator.nextInt(1000); }
Exemplo: Callable(2)
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.ArrayList;public class MainCallable {
public static void main(String[] args) {
ExecutorService executor= Executors.newCachedThreadPool();
ArrayList<Future<Integer>> resultados= new ArrayList<Future<Integer>>(); for (int i=0; i < 10; i++)
resultados.add(executor.submit(new ThreadCallable()));
executor.shutdown();
for (int i=0; i< resultados.size(); i++){ try {
Future<Integer> result; result = resultados.get(i);
System.out.println(result.get());
} catch (InterruptedException ex) { ex.printStackTrace();
} catch (ExecutionException ex) { ex.printStackTrace();
} }
} }
Exercício
• Faça uma aplicação Java multithread para buscar um dado elemento em um vetor
(desordenado). Utilize um objeto Random para gerar numeros aleatórios
– Cada Thread ficará responsável pela busca em uma parte do vetor
– Retorne a posição do elemento no vetor ou –1 caso o elemento não foi encontrado
ThreadFactory
• Um Executor utiliza padrão de projeto Factory para criar as Threads
• O desenvolvedor pode criar sua própria ThreadFactory para criar threads
– Definir atributos específicos
– Definir prioridades durante a criação de threads – Definir manipuladores de exceção
ThreadFactory(1)
import java.util.concurrent.*;
public class MyThreadFactory implements ThreadFactory{ private static int count;
public MyThreadFactory(){ count=0;
}
public Thread newThread(Runnable r) {
Thread t= null; t = new Thread(r); count++; t.setName("MinhaThread[" + count + "]"); return t; } }
ThreadFactory(2)
Classe MyThread
public class MyThread implements Runnable { public void run() {
System.out.print(Thread.currentThread().getName()); System.out.println(Thread.currentThread().getId()); } } ----Classe principal import java.util.concurrent.*; public class MainTreadFactory {
public static void main(String[] args) {
ExecutorService executor=
Executors.newFixedThreadPool(10, new MyThreadFactory());
for (int i=0; i < 10; i++)
executor.execute(new MyThread()); executor.shutdown();
} }
Exercício
• No exercício anterior, foi utilizado um
gerenciador de Threads com número fixo. Substitua por um gerenciador dinâmico
(Executors.newCachedThreadPool(new
MyFactoryThread())
Tratando “exception”
import java.util.concurrent.*;
public class ExceptionThread implements Runnable { public void run() {
throw new RuntimeException(); }
public static void main(String [] args) {
ExecutorService exec = Executors.newCachedThreadPool() ; exec.execute(new ExceptionThread());
exec.shutdown(); }
Tratando internamente
import java.util.concurrent.*;
public class ExceptionThread implements Runnable { public void run() {
try {
throw new RuntimeException(); catch (Exception ex){
ex.printStackTrace(); }
}
public static void main(String [] args) {
ExecutorService exec = Executors.newCachedThreadPool() ; exec.execute(new ExceptionThread());
exec.shutdown(); }
Tratando “exception”(2)
/ / : concurrency/NaiveExceptionHandling.java import java.util.concurrent.*;
public class NaiveExceptionHandling {
public static void main(String[] args) { try {
ExecutorService exec= Executors.newCachedThreadPool(); exec.execute(new ExceptionThread()) ;
exec.shutdown();
} catch(RuntimeException ue) {
// Esse código não executará!!
System.out.println("Exception has been handled!"); }
} }
Registrando um tratador de
exceções
• Como “pegar” exceções que não são tratadas internamente nas Threads?
• Registrar um tratador de exceções durante a criação de uma thread (ThreadFactory)
– Interface Thread.UncaughtExceptionHandler – public void uncaughtException(Thread t,
Tratador de exceções(1)
public class MyThreadFactory implements ThreadFactory{ public Thread newThread(Runnable arg0) {
Thread t=new Thread(arg0);
t.setUncaughtExceptionHandler(new TrataException());
return t; }
}
class TrataException implements UncaughtExceptionHandler{
public void uncaughtException(Thread arg0, Throwable arg1) { System.out.println("Tratador da exceção da Thread");
} }
Tratador de exceções(2)
class MyThread implements Runnable{ public void run(){
throw new RuntimeException();
} }
public class MainUncaughtException {
public static void main(String[] args) {
ExecutorService executor=
Executors.newCachedThreadPool(new MyThreadFactory());
for (int i=0; i < 5 ; i++)
executor.execute(new MyThread()); }
Sincronização entre threads
Sincronização
• Programa concorrente
– Executado por diversos processos – Acesso concorrente a dados
• Paralelismo real x Paralelismo aparente
– Multiprocessadores: paralelismo real – Paralelismo “aparente”: concorrência
Programas concorrentes
• Processos seqüenciais que executam
concorrentemente (afetam ou são afetados por outros programas)
• Motivação
– Aumento de desempenho em máquinas multiprocessadas
– Interface com o usuário
– Sobreposição de operações de E/S e processamento
Problema produtor-consumidor
• Seja um buffer compartilhado entre dois
processos/threads. O processo produtor coloca dados em um buffer que são retirados pelo processo
consumidor
• Possível implementação
– Uma variável count armazena o número de posições preenchidas no buffer
– Inicialmente armazena o valor 0
– Quando o processo produtor coloca um dado no buffer,
count é incrementado
– Quando o processo consumidor retira o dado do buffer,
Produtor
while (true) {
/* produce an item and put in nextProduced */ while (count == BUFFER_SIZE)
; // do nothing
buffer [in] = nextProduced;
in = (in + 1) % BUFFER_SIZE; count++;
Consumidor
while (true) {
while (count == 0) ; // do nothing
nextConsumed = buffer[out];
out = (out + 1) % BUFFER_SIZE; count--;
/* consume the item in nextConsumed }
Condição de corrida
• count++ ou count—
– register= count – register= count+1 – count= register
• Considere a execução intercalada com “count = 5” inicialmente:
S0: produtor executa register1 = count {register1 = 5}
S1: produtor executa register1 = register1 + 1 {register1 = 6} S2: consumidor executa register2 = count {register2 = 5}
S3: consumidor executa register2 = register2 - 1 {register2 = 4} S4: produtor executa count = register1 {count = 6 }
Problema da seção crítica
• Condição de corrida: vários processos/thread manipulam conjuntos de dados onde o
resultado depende da ordem de execução
• Seção crítica: trecho de código onde somente um processo pode executar por vez
Solução
• Criar um protocolo que garanta a exclusão mútua
– Execução da seção crítica por somente um processo/thread
Propriedades da seção crítica
• Regra 1: Exclusão mútua • Regra 2: Progressão
– Nenhum processo fora da seção crítica pode bloquear um outro processo
• Regra 3: Espera limitada
– Nenhum processo pode esperar infinitamente para entrar na seção crítica
• Regra 4
– Nenhuma consideração sobre o número de processadores ou velocidades relativas
Classe Conta
public class Conta { int saldoPoupanca; int saldoCC;
public Conta(int _saldoPoupanca, int _saldoCC){ saldoPoupanca= _saldoPoupanca;
saldoCC= _saldoCC; }
public void transferePoupanca(int v){ saldoCC -= v;
saldoPoupanca +=v; }
public void transfereCC(int v){ saldoPoupanca -=v;
saldoCC +=v; }
public int saldoTotal(){
return (saldoPoupanca + saldoCC); }
Produtor
public class Produtor implements Runnable{ private Conta c;
private Random r= new Random(); public Produtor(Conta _c){
c= _c; }
public void run(){ while (true){ c.transfereCC(r.nextInt(1000)); c.transferePoupanca(r.nextInt(500)); } } }
Consumidor
public class Consumidor implements Runnable{ private Conta c;
private int saldoInicial;
public Consumidor(Conta _c, int _saldoInicial){ c= _c;
saldoInicial=_saldoInicial; }
public void run(){ int saldo; while (true){
saldo= c.saldoTotal();
if (saldo != saldoInicial){
System.out.println("Saldo errado = "+saldo); Thread.currentThread().yield();
} }
} }
Sincronizando acessos concorrentes
• Synchronized
– Evita a execução concorrente das threads
• Como definir a sincronização
– Métodos
• public synchronized transfereCC(int v);
• public synchronized transferePoupanca(int v); • public synchronized saldoTotal();
• Quando um método é definido como synchronized, ocorre um bloqueio, evitando a execução
– No método
Exercício
1) Altere o código da transferência de conta para que o mesmo funcione corretamente 2) Verifique o funcionamento da aplicação
SerialNumberGenerator
a) Identifique o problema
Utilizando tipos de dados
sincronizados
• AtomicInteger, AtomicLong
– boolean compareAndSet(expectedValue, updateValue);
– addAndGet (int delta) – incrementAndGet()
• AtomicReference<V>
– boolean compareAndSet(expectedValue, updateValue);
Tipos de dados sincronizados
• Os tipos Queue (fila) LinkedQueue (lista) no pacote java.util não são sincronizados • Tipos de dados thread-safe são definidos
no pacote java.util.concurrent • Interface BlockingQueue()
– Classe: ArrayBlockingQueue • void put(E e);
ArrayBlockingQueue
class Main() {
// Instanciando
private ArrayBlockingQueue<Integer> buffer= new
ArrayBlockingQueue<Integer>(5); // buffer de 5 posições }
// inserindo elementos no buffer
buffer.put(new Integer(5));
//removendo elementos do buffer
Exercício
• Implemente duas Threads, uma produtora e uma consumidora
– O Produtor deverá gerar 1000 números e colocar no buffer compartilhado para ser consumido pela Thread Consumidor
– Utilize a classe BlockingQueue como buffer compartilhado
Semáforos
• Proposto por Dijkstra(1965)
• Sincronização que não necessita de espera ativa • Semáforo S – variável inteira
• Duas operações S: acquire() e release() • Operações atômicas
Semáforo
• Binário: varia entre 0 e 1 • Contador: valor inteiro
Implementação de semáforos
• Deve garantir que dois processos não
executem acquire () e release () no mesmo semáforo ao mesmo tempo
• A implementação do acquire e release torna-se o problema de torna-seção crítica.
– Espera ativa
• Algumas aplicações podem ficar muito tempo na seção crítica.
Implementação de semáforo sem
espera ativa
• Cada semáforo tem sua fila de espera. Cada posição da fila de espera tem dois campos:
– valor (tipo inteiro)
Duas operações:
• block – coloca o processo que esta adquirindo o semáforo na fila apropriada.
• wakeup – remove o processo que esta na fila de espera e coloca na fila de prontos.
Implementação de semáforo sem
espera ativa
• Acquire()
Semafóros em Java
• Exemplo:
import java.util.concurrent.Semaphore;
public class ThreadInterface implements Runnable{
Semaphore sem= new Semaphore(1);
public void run() {
sem.acquire();
for (int i=0; i <10;i++){
System.out.println("Thread["+Thread.currentThread(). getName()+"]="+i); Thread.yield(); } sem.release(); }
public static void main(String args[]) {
ExecutorService executor= Executors.newCachedThreadPool(); for (int i=0; i < 5; i++)
executor.execute(new MyThread());
} }
Bloqueios e variáveis de condição
• Menor overhead que semáforos e synchronized • Implementação mais eficiente no Java
• Variáveis de condição são utilizadas caso seja necessário bloquear a execução no meio da
Exemplo: Lock
import java.util.concurrent.locks.*;
public class ThreadInterface implements Runnable{
Lock mutex= new Lock();
public void run() {
mutex.lock();
for (int i=0; i <10;i++){
System.out.println("Thread["+Thread.currentThread(). getName()+"]="+i); Thread.yield(); } mutex.unlock(); }
public static void main(String args[]) {
ExecutorService executor= Executors.newCachedThreadPool(); for (int i=0; i < 5; i++)
executor.execute(new MyThread());
Exemplo: Variáveis de condição
class ProducerConsumer { Lock mutex= new Lock();
Condition cond= mutex.newCondition();
static class Produtor extends Runnable{ public void run() {
while (true) { mutex.lock();
while (count == BUFFER_SIZE) cond.await();
buffer [in] = nextProduced; in = (in + 1) % BUFFER_SIZE; count++; cond.signal(); } } }
static class Consumidor extends Runnable{ public void run() {
while (true) { mutex.lock();
while (count == 0) cond.await();
nextConsumed= buffer[out]; out = (out + 1) % BUFFER_SIZE; count--; cond.signal(); } } } .. ..// método Main }
Deadlock e starvation
• Deadlock – dois ou mais processsos esperam infinitamente por eventos que somente podem ser gerados por processos no estado de espera
• Seja S e Q dois semáforos inicializados em 1
P0 P1 S.acquire(); Q.acquire(); Q.acquire(); S.acquire(); . . . . . . S.release(); Q.release(); Q.release(); S.release(); • Starvation – bloqueio indefinido.