Transacções distribuídas:
Three-Phase Commit
(3PC)
• É não-bloqueante: Todos os processos que não falham
eventualmente decidem
• Qual a condição de terminação no 2PC?
• Problema no 2PC: se coordenador falha antes de enviar
mensagem da fase 2 (resultado) a um processo correcto
– Processo correcto não pode decidir commit pois algum processo
pode ter votado não
– Também não pode decidir abort pois processo pode ter decidido
commit e posteriormente falhado
Transacções distribuídas:
Three-Phase Commit
(3PC)
• Solução: protocolo deve garantir que se um processo está
incerto, então nenhum processo pode ter decidido commit
• Criar um novo estado: Pré-Confirmar, antecede o commit
• Em caso de falha do coordenador, eleger novo
Transacções distribuídas:
3PC - diagramas de estados
Inicial
Esperar
Pré-Confirmar
-(Preparar)
Todos votos sim
(Preparar Confirmação)
Um voto não
(Abortar Global)
Inicial
Preparado
Preparar
(voto sim)
Preparar Confirmação
(Preparado)
Abortar
Global
(Feito)
Preparar
(voto não)
Coordenador
Participante
Todos preparados
Pré-Confirmar
Confirmar Global
Protocolo
Terminação
Transacções distribuídas:
3PC – Mensagens e fases
C
P
P
P
C
C
Preparar
Sim
Preparar
confirmação
Feito
P
P
P
Preparado
C
P
P
P
Confirmar
Global
Transacções distribuídas:
Tolerância a faltas no 3PC
• Timeout no Coordenador
– Estado Esperar (já não era bloqueante no 2PC)
• Aborta unilateralmente a transacção
– Estados Pré-Confirmar
• Os participantes podem não estar no Pré-Confirmar, mas pelo menos
estão no Preparado
• Sabe que nenhum pode estar no estado de abortar porque de outra
forma ele não podia estar no estado de Pré-confirmação
– E (re)envia a mensagem Confirmar Global
Transacções distribuídas:
Tolerância a faltas no 3PC
• Timeout num Participante
– Estado Inicial (já não era bloqueante no 2PC)
• Opta unilateralmente por abortar a transacção
– Estado Preparado ou Pré-Confirmar
• Depende da decisão do Coordenador que já influenciou
• Corre protocolo de terminação
Protocolo de Terminação
• Eleger novo coordenador
• Novo coordenador pergunta o estado a todos
• Se algum estiver em Commit/Abort envia
Commit/Abort a todos
• Se todos em Preparado, nenhum em Commit envia
Abort a todos
• Se todos em Pre-confirmar, nenhum em Abort envia
Commit a todos
Transacções distribuídas:
Tolerância a faltas no 3PC
• Recuperação do Coordenador
– Estados Inicial
• Pode abortar unilateralmente
– Estado Esperar
• Pode ter sido eleito um Novo Coordenador
• Se o NC estiver nos estados Esperar ou Abortar a transacção abortou
• Pergunta aos Participantes como terminou a transacção
– Estado Pré-Confirmar
• Pergunta aos Participantes como terminou a transacção
– Estados Abortar ou Confirmar
• Se ainda não recebeu todas as confirmações repete o envio da
mensagem global previamente enviada
Transacções distribuídas:
Tolerância a faltas no 3PC
• Recuperação de um Participante
– Estado Inicial
• Aborta unilateralmente a transacção
– Estado Preparado ou Pré-Confirmar
• Pergunta ao Coordenador ou aos Participantes como terminou a
transacção
Transacções em Web Services
• Visão mais abrangente: mecanismo para participantes
numa aplicação obter um consenso sobre um resultado
• Suporta vários modelos/protocolos transaccionais
– E.g., modelo ACID, 2PC, nested, business transactions, ...
• Separar a coordenação que tem sempre de existir dos
protocolos específicos de consenso
• WS-coordination: criar o contexto, definir o coordenador
e saber quem são os participantes numa actividade
WS-Coordination - modelo
• Uma aplicação é um conjunto de actividades (e.g.,
transacções, ou outras)
• O WS-coordination é uma especificação de um protocolo para
gerir as actividades que constituem uma aplicação
WS-Coordination - modelo
• Cada mensagem trocada inclui um
CoordinationContext para compreendê-la:
– Identificador único da actividade
– “Registration Service” utilizado
– Tipo de coordenação (Transacção atómica vs.
BusinessActivity)
WS-Coordination - Serviços
• Activation Service – Usado para criar transacção/
actividade, retorna CoordinationContext
• Registration Service – Usado pelos participantes
para se registarem junto do coordenador para um
protocolo
• Coordination Protocol Services – Específico a
cada protocolo de coordenação
WS - Transaction
WS - Coordination
WS - Transaction
• Dois tipos de coordenação:
• Transacção Atómica (AT)
– Usado para coordenar actividades de curta duração
– Garantem atomicidade (e.g., usando o 2PC)
– Desenhados para garantir compatibilidade com sistemas
transaccionais existentes
• Business Activity (BA)
– Actividades de longa duração
– Desenhado para uma lógica de negócio
– Desenhados para garantir compatibilidade com sistemas de
modelação de processos de negócio / workflow
Transacção Atómica (AT)
• Norma que usa o WS-Coordination, estendendo-o
para suportar transacções atómicas:
– Ao criar uma transacção atómica, criar um
CoordinationContext associado ao coordenador
– Propagar o CoordinationContext nas mensagens
– Registar os participantes nos protocolos de coordenação
– Suporta 2PC ou outros protocolos
Exemplo de uso do CoordinationContext
<?xml version="1.0" encoding="utf-8"?> <S:Envelope xmlns:S="http://www.w3.org/2001/12/soap-envelope" <S:Header> . . . <wscoor:CoordinationContext xmlns:wscoor="http://schemas.xmlsoap.org/ws/2002/08/wscoor" xmlns:wsu="http://schemas.xmlsoap.org/ws/2002/07/utility" xmlns:myApp="http://Schedule456.com/myApp"> <wsu:Identifier> http://Fabrikam123.com/SS/1234 </wsu:Identifier> <wsu:Expires>2002-08-31T13:20:00-05:00</wsu:Expires> <wscoor:CoordinationType> http://schemas.xmlsoap.org/ws/2002/08/wstx </wscoor:CoordinationType> <wscoor:RegistrationService> <wsu:Address> http://Schedule456.com/mycoordinationservice/registration </wsu:Address>< myApp:Myapp:BetaMark> ... </myApp:Myapp:BetaMark> <myApp:EBDCode> ... </myApp:EBDCode> </wscoor:RegistrationService> <myApp:IsolationLevel>RepeatableRead</myApp:IsolationLevel> </wscoor:CoordinationContext>WSDL de Participante no 2PC
<wsdl:portType name="2PCParticipantPortType"> <wsdl:operation name="Prepare"> <wsdl:input message="wstx:Prepare" /> </wsdl:operation> <wsdl:operation name="OnePhaseCommit"> <wsdl:input message="wstx:OnePhaseCommit" /> </wsdl:operation> <wsdl:operation name="Commit"> <wsdl:input message="wstx:Commit" /> </wsdl:operation> <wsdl:operation name="Rollback"> <wsdl:input message="wstx:Rollback"> </wsdl:operation> <wsdl:operation name="Unknown"> <wsdl:input message="wstx:Unknown" /> </wsdl:operation> <wsdl:operation name="Error"> <wsdl:input message="wstx:Error" /> </wsdl:operation> </wsdl:portType>WSDL de Coordenador no 2PC
<wsdl:portType name="2PCCoordinatorPortType"> <wsdl:operation name="Prepared"> <wsdl:input message="wstx:Prepared" /> </wsdl:operation> <wsdl:operation name="Aborted"> <wsdl:input message="wstx:Aborted"/> </wsdl:operation> <wsdl:operation name="ReadOnly"> <wsdl:input message="wstx:ReadOnly"/> </wsdl:operation> <wsdl:operation name="Committed"> <wsdl:input message="wstx:Committed"/> </wsdl:operation> <wsdl:operation name="Replay"> <wsdl:input message="wstx:Replay"/> </wsdl:operation> <wsdl:operation name="Unknown"> <wsdl:input message="wstx:Unknown" /> </wsdl:operation> <wsdl:operation name="Error">Transacções de Negócio
• Novo modelo transaccional
• Semelhante: acede múltiplos objectos, cada acesso
transaccional
• Diferente: Em caso falha, em vez de ocultar
resultados, executa compensação
Transacções de Negócio – Exemplo
• Agência de viagens necessita comprar bilhetes de
avião, estadia, e alugar carro para um cliente
• Cada um destes passos é um web service
exportado por entidade autónoma (diferentes
domínios administrativos)
• Processo pode ser moroso
Actividade de Negócio (Business Activity)
• Unidade de base: Actividade
• O processo de nível mais alto denomina-se
actividade de negócio
– Pode ter longa duração
– Não compatível com uso de trincos
– Permite que acções provisórias sejam expostas a outros
participantes no negócio
Compensação
• Se uma acção não for bem sucedida, executa acção
de compensação
• Por exemplo, cancelar as reservas de voo e hotel
(mesmo que, para tal, tenha de pagar uma taxa)
• Em certos casos poder-se-ia retomar a actividade
Compensação
Assume-se que t4 aborta mas que a actividade poderia continuar se repuser o
estado modificado por algumas das tarefas anteriores a t4.
Filas de Mensagens
Emissor
Emissor
Emissor
Emissor
Receptor
Receptor
Receptor
Receptor
fila
fila
Rede
Canal com fila de mensagens
Modelo do RPC ou de ligação de
transporte
Integração por Mensagens – Message Oriented
Middleware
•
A integração é feita através do encaminhamento de informação (mensagens) entre os
sistemas.
•
As aplicações recebem e enviam as mensagens para um servidor central (broker).
•
As mensagens uma vez recebidas pelo broker podem ser reformatadas, combinadas ou
modificas por forma a serem entendidas pelo sistema de destino.
•
Normalmente não é necessário modificar os sistemas envolvidos. Os Message Brokers
fornecem adaptadores para as aplicações mais comuns (SAP, Baan, PeopleSoft, etc.).
Positivo
Negativo
Mensagens Directas
(mecanismo de
transportes)
•
As API são muito simples.• Qualquer tipo de dados pode ser transmitido.
• Total separação entre os dados e o código das aplicações que tratam as mensagens
•Tudo o que está acima + • Funcionamento assíncrono permite distribuir carga e ganhar eficiência
• Permite um funcionamento desconectado da rede.
• Permite 1 para muitos e muitos
•
As aplicações tem de fazer manualmente a codificação e a descodificação dos dados•Tudo o que está acima +
•A maioria dos produtos de queuing não interoperam bem
• O assincronismo torna a programação mais difícil (programação por eventos).
Filas de
Mensagens
Message queuing
Comparação de canais com e sem fila de
mensagens
Modelo de Comunicação
• Comunicação Assíncrona
– O emissor é imediatamente desbloqueado depois da mensagem ter sido
aceite para envio (funcionamento assíncrono)
– O servidor responde também assincronamente
• Comunicação Sincrona
– O emissor só continua quando recebe um ack que a mensagem foi
recebida
• Pedido-resposta – comunicação síncrona
– Muitas interacções baseiam-se num pedido e numa resposta.
Comportamento semelhante ao da chamada remota de procedimentos
– O emissor pode especificar o endereço de resposta mas este depende da
Exemplo de MOM
Java Messages
Java Messaging Service - JMS
• Elementos da infraestrutura
– Filas de mensagens - queues
– Envio e recepção de mensagens
– Mensagens ponto a ponto
– Subscrição e publicação de mensagens
– Formato das Mensagens
Envio e Recepção de Mensagens
• As aplicações falam com o message broker através de uma connection.
• Dentro de uma connection um processo pode ter várias sessions para
cada uma das suas tarefas (threads)
• Depois estar ligado ao message broker associa-se ou cria uma queue
• Cria um message sender ou message receiver para aceder à queue
• Um processo pode enviar mensagem ou receber mensagens de uma
queue através das funções do objecto queue
• A recepção pode ser síncrona ou assíncrona
• As mensagens podem ser:
– Stream – sequência de tipos básicos de Java
– Texto – na qual se incluem documentos XML
– Objectos – objectos Java serializados
Envio das Mensagens
• Ponto a Ponto (a designação pode confundir)
– Quer dizer que a aplicação define a queue para onde envia a
mensagem.
– A aplicação pode obter a referência para a queue através de um
serviço de nomes (JNDI)
– O sistema passa automaticamente a referência para a queue de
resposta
• Publicar e Subscrever
– Neste modo as mensagens não são enviadas para um destinatários
mas para um tópico
Exemplo JMS(I)
public class Hello {
public static void main(String[] args) { try {
/* Declaração das variáveis JMS */
QueueConnectionFactory queueConnectionFactory = null; QueueConnection queueConnection = null;
Queue queue = null;
QueueSession queueSession = null; QueueSender queueSender = null; QueueReceiver queueReceiver = null; TextMessage textMessage = null; Message message = null;
/* Declaração de variáveis para argumentos da linha de comando */ final String MQ_HOST_NAME;
Exemplo JMS(II)
System.out.println("Usage: java Hello <mq_host_name> <mq_host_port>"); System.exit(1);
}
MQ_HOST_NAME = args[0]; MQ_HOST_PORT = args[1];
System.out.println("Message queue host is " + MQ_HOST_NAME + ":" + MQ_HOST_PORT); /* Instanciação de uma fábrica de ligações */
/* Instancia-se directamente a classe Sun MQ para poder usar os métodos para definir nome e porto do servidor de mensagens */
com.sun.messaging.QueueConnectionFactory sunQueueConnectionFactory = new com.sun.messaging.QueueConnectionFactory();
sunQueueConnectionFactory.setProperty("JMQBrokerHostName", MQ_HOST_NAME); sunQueueConnectionFactory.setProperty("JMQBrokerHostPort", MQ_HOST_PORT); queueConnectionFactory = sunQueueConnectionFactory;
/* Criação de uma ligação ao servidor de mensagens */
Exemplo JMS(III)
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
/* Instanciação de uma fila de mensagens com o nome especificado */
/* A fila é criada implicitamente no servidor, caso não exista */
queue = new com.sun.messaging.Queue("world");
/* Criação do produtor de mensagens */
queueSender = queueSession.createSender(queue);
/* Criação e envio de uma mensagem */
textMessage = queueSession.createTextMessage();
textMessage.setText("Hello World");
System.out.println("Sending Message: " + textMessage.getText());
queueSender.send(textMessage);
/* Neste exemplo muito simples é o mesmo programa a enviar e a receber a mensagem
*/
Indica que a Session não é Transaccional
Exemplo JMS(IV)
/* Reiniciação da ligação */ queueConnection.start();
/* Recepção de mensagem e análise do conteúdo */ message = queueReceiver.receive();
if (message instanceof TextMessage) { textMessage = (TextMessage) message;
System.out.println("Read Message: " + textMessage.getText()); }
/* Fecho da sessão e da ligação */ queueSession.close();
queueConnection.close(); } catch (Exception e) {
/* À falta de melhor tratamento de erro, é conveniente imprimir a excepção */ System.out.println("Exception occurred : " + e.toString());
Exemplo: receptor Assíncrono (I)
public class AsynchReceiver {
public static void main(String[] args) { int exitResult = 0;
String queueName = null;
/* Declaração das variáveis JMS */
QueueConnectionFactory queueConnectionFactory = null; QueueConnection queueConnection = null;
QueueSession queueSession = null; Queue queue = null;
QueueReceiver queueReceiver = null; TextListener textListener = null;
/* Declaração de variáveis para argumentos da linha de comando */ final String MQ_HOST_NAME;
final String MQ_HOST_PORT;
/* Validação dos argumentos recebidos na linha de comando */ if (args.length != 3) {
Exemplo: receptor Assíncrono (II)
MQ_HOST_NAME = args[0]; MQ_HOST_PORT = args[1];
System.out.println("Message queue host is " + MQ_HOST_NAME + ":" + MQ_HOST_PORT); queueName = new String(args[2]);
System.out.println("Queue name is " + queueName);
/* Criar uma fábrica de ligações, criar uma ligação,
criar uma sessão a partir da ligação (false indica que não é transaccional), criar uma fila de mensagens */
try { com.sun.messaging.QueueConnectionFactory sunQueueConnectionFactory = new com.sun.messaging.QueueConnectionFactory(); sunQueueConnectionFactory.setProperty("JMQBrokerHostName", MQ_HOST_NAME); sunQueueConnectionFactory.setProperty("JMQBrokerHostPort", MQ_HOST_PORT); queueConnectionFactory = sunQueueConnectionFactory;
Exemplo: receptor Assíncrono (III)
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString()); if (queueConnection != null) {
try {
queueConnection.close(); } catch (JMSException ee) { }
}
System.exit(1); }
/* Criar consumidor de mensagens,registar o tratamento de mensagens (TextListener) e iniciar a recepção de mensagens.
O 'listener' escreve as mensagens obtidas. O programa fica bloqueado até o 'listener' receber a mensagem final e efectuar o desbloqueio. */
try {
queueReceiver = queueSession.createReceiver(queue); textListener = new TextListener();
queueReceiver.setMessageListener(textListener);
/* Iniciar a ligação */ queueConnection.start();
Exemplo: receptor Assíncrono (IV)
/* Quando não houver mais nada para fazer,
vai bloquear-se para esperar o fim da recepção de mensagens */
textListener.monitor.waitTillDone();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString()); exitResult = 1; } finally { if (queueConnection != null) { try { queueConnection.close(); } catch (JMSException e) { exitResult = 1; } } }
Função que trata as mensagens assíncronas
public class TextListener implements MessageListener {
final DoneLatch monitor = new DoneLatch();
/** método invocado no Listener de mensagens quando chega uma mensagem nova */
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message; try {
System.out.println("Reading message: " + msg.getText()); } catch (JMSException e) {
System.out.println("Exception in onMessage(): " + e.toString()); }
} else {
/* A mensagem que não é de texto indica o fim da sequência. Acorda quem esteja à espera */