Saturday, May 26, 2012

Summary of "Your Mouse is a Database" - May 2012 CACM

In a typical web application, when you make an Ajax call to a web service, you have to wait for the entire response to be received before you can act on it. You cannot process the data as it's coming in off the wire. The article "Your Mouse is a Databse" in the May 2012 issue of CACM by Erik Meijer explores a programming API called Reactive Extensions that can asynchronously stream large or infinite amounts of data to an application (think of a stream of Twitter tweets--new tweets are always being created, so there's never an end to the data). Two key words here are "asynchronous" and "stream". Asynchronous means that the data is handled in a separate thread so the application's UI does not lock up. And stream means that the data is acted on as it's being received.

The term "data" is used very broadly--it doesn't have to be data from a web service over the Internet. The data can also be UI input from the user such mouse movements or typing characters into a text box. The data is push-based because it is sent to the consumer instead of the consumer having to explicitly request it.

To help explain the concept, Meijer proposes modifying Java's Future interface (an interface used to check on the status of threads that are running in the background) to handle such data.

interface Future<T> {
  Closable get(Observer<T> callback);
}

He slims the Future<T> interface down to just one method. The get method allows a consumer to subscribe to the data stream. The return value for the method is an instance of Closeable, which allows the programmer to cancel that particular subscription if she wishes.

The Observer<T> parameter processes data from the stream asynchronously. Meijer bases the Observer<T> interface on GWT's AsyncCallback interface by giving it onFailure() and onSuccess() methods. onFailure() is called if there's an error retrieving data from the stream. onSuccess() is called when (or if) the stream ends. The third method, onNext() defines how to handle each data item from the stream (like saving it to a database or displaying it on the screen).

interface Observer<T> {
  void onFailure(Throwable t);
  void onSuccess();
  void onNext(T value);
}

Meijer then describes a number of query opeators in the Reactive Extensions library that can be used to filter this streaming data. For instance, the "where" operator lets the programmer specify the criteria for whether or not a data item should be processed. If the data item does not meet the criteria, then it is discarded. Another operator is "throttle", which prevents too much data from being processed too quickly. For example, if the throttle value is set to 2 seconds and 10 data items are pushed within a 2 second time span, it will only process the most recent message within that 2 second time span. The other 9 messages will be ignored. These operators can be chained together to give the programmer strong control over how to filter a stream.

This idea of streaming push-based based data can help developers design more memory-efficient applications. Tt can also help developers filter the data from larger streams, like Twitter for example, so as to not overwhelm the application with data it doesn't need.

Sunday, May 20, 2012

Summary of "Idempotence Is Not a Medical Condition" - CACM, May 2012

I wanted to read the article "Idempotence Is Not a Medical Condition" by Pat Helland in the May 2012 issue of CACM because it smelled like an article that was full of big words and fuzzy architecture abstractions. I learned some things from the article (and I really like the title), but it's mostly full of FUD. The jist of the article is: "Are you SURE that the messages you send over the network are delivered successfully? Are you REALLY sure? I mean, are you SUPER DUPER sure? After all, how can we be certain of anything in this crazy world?" By the way, SQL Server Broker eliminates this uncertainty for you.


In this article, Helland says that distributed systems today are largely composed of a collection of off-the-shelf software and cloud-based Internet services. This means that they lack a centrally-enforced communication policy. So, extra care must be taken to ensure that each message is delivered and that no message is lost, even if one of the sub-systems goes down or restarts.

This contrasts with the traditional notion of a distributed system: a cluster of tightly-coupled computers in a lab somewhere, which are specifically programmed to talk only to each other. Communication is simpler in this situation because each sub-system is essentially identical and also physically present in the same location.

Every sub-system has some sort of "plumbing" which sends and receives messages (e.g. a TCP network stack). If the plumbing is able to do things like handle duplicate messages and resend lost messages, then the system's application layer does not have to be programmed to ensure reliable communication. But if the plumbing cannot do these things, then the application layer is responsible.

A message should only be marked as "consumed" if the action that the receiving application performs on the message is successful. For example, if an application tries to save a message it receives to a database, but the database transaction fails, then the message should not be considered "consumed" because the operation that the message invoked (a database call) was not successful. The message should sit in a queue somewhere until the application is able to successfully process it or it should respond to the client saying that it failed the operation.

Helland also states that messages should be idempotent, meaning that if the same message is sent multiple times, the effect it has on the server should be the same as if just one message was sent. The term is often used in the context of HTTP--GET requests are defined as being idempotent, while POST requests are not.

Messages can have a preference for one of two types of behavior: "history" or "currency". History means that the messages must be delivered in order. If message N+1 arrives, but message N has not yet arrived, then it must wait until it receives message N in order to deliver message N+1 to the application. Downloading a file requires this type of communication because all of the file's data must be delivered in order or else you'll get a corrupt file. Currency means that getting the most recent message is what matters. If some messages are skipped or lost, that's OK. Getting the most recent price of a stock is one such example of this behavior type.

Helland says that the greatest moment of uncertainty during the request/response cycle is right after the request is sent and the application is waiting for a response. The assumption is that the message has been received and is being processed, but you don't know this for sure. For all you know, the receiving end could have decided to ignore the request and not send a response. It's for this reason that timeouts are needed. If the requester has not received a response within the timeout period, then it will assume the request was not processed and will stop waiting for a response.

However, there is one thing that you can know for certain, Helland says. "Each message is guaranteed to be delivered zero or more times!". Finally, something I can rely on!

Helland says that despite TCP's robustness and wide-spread use, it cannot fully be trusted to deliver messages reliably. He says that TCP "offers no guarantees once the connection is terminated or one of the processes completes or fails." I don't understand what he's trying to say. Of course it offers no guarantees once the connection is terminated. You can't send any messages over a terminated connection. And of course it offers no guarantees once one of the processes completes or fails. At that point, the TCP conversation is over.

He then goes on to say that "challenges arise when longer-lived participants [such as HTTP requests] are involved." He says that when a persistent HTTP connection is needed, the TCP connection is usually kept alive, but there's no guarantee that this will happen. Because of this, the HTTP request may have to be sent multiple times.

Helland then goes into an in-depth discussion about idempotency. He says that, technically, no idempotent request is truly idempotent because every request has some lasting effect on the server. For instance, most servers keep an access log of every request that was received. Making five identical requests, even if they are idempotent, will add five entries to the log file. Also, the performance of the server is impacted every time it receives a request. The more requests it receives, the more its performance will degrade. However, these side-effects are not related to how the actual application logic behaves, which is the true context in which the term "idempotent" should be used.

Helland states that state-ful communication is more difficult than state-less communication because all previously sent messages must be taken into consideration when processing the current message.

He also points out that there's no way of knowing whether the server that receives a request is doing the actual work to fulfill that request. The server could be forwarding the request to another server to do the actual work.

Helland says that if a dialog between two servers breaks apart in the middle of the conversation, both ends must be able to cleanly recover from this failure.

If a service is load-balanced across multiple servers, then state-ful information must be stored in such a way so that a client's state is not lost. One way to do this is to store the state information on a designated server that the other servers have access to. That way, requests from the same client can be handled by any server in the cluster.

Alternatively, the client could be assigned to a designated server in the cluster by a load-balancer when it makes its first request. This server will then be responsible for maintaining the state information for this client and handling all of the client's requests. The first request that the client makes must be idempotent because if the request is received successfully, but the response from the server is lost, then the client will assume that the request was lost and try making the request again. When the request is sent for the second time, the load-balancer may assign the request to a different server. If the request is not idempotent, then the request will be applied twice, thus tainting the server's data. Once the client receives a server response to its first request, it now knows which server in the cluster it should communicate with, which means that all subsequent messages don't have to be idempotent because there's no risk of sending the same request to multiple servers.

Helland says there are three ways to make this first client request idempotent. (1) You can send basically an empty message to the server, such as a TCP SYN message, (2) you can perform a read-only operation, or (3) you have the server queue a non-idempotent operation which will only be executed by the server once the connection has been confirmed. Approach (1) is the simplest, but approaches (2) and (3) can be seen as more efficient because they are performing a useful operation. Or, in Helland's words: "allowing the application to do useful work with the round-trip is cool."

The last point Helland makes is that the last message of a conversation cannot be guaranteed. This is because, if you were to send a response to the last message stating that you have received it, then it wouldn't be the last message! Therefore, applications must be designed so that it is not important whether the last message is received or not.

Friday, May 18, 2012

Database Migration Scripts

I recently added database migration functionality to my Sleet SMTP project. This means that, if I release a new version of the application that includes a change to the database schema, the existing databases of deployed applications will be migrated to the new schema automatically. Before, you would have had to wipe the database completely or apply the schema changes manually, so this is a big improvement.

The way it works is as follows. I created a table in the database whose sole purpose is to store the schema version of the database. This is just an integer that starts at "1" and increments every time the schema changes. The source code also contains a version number, which is the schema version that the source code is programmed to use. When Sleet starts up, it compares the version number in the database with the version number in the source code to determine if the schema is out of date.

If the schema is out of date, it runs a series of migration scripts. Each migration script contains the SQL code necessary to migrate the database from one version to the next. For example, if the latest database schema version is "4", then the application will contain three migration scripts: 1-to-2, 2-to-3, and 3-to-4. By chaining these scripts together, the database schema can be updated no matter what version it currently is. For example, if the schema version of my database is "2", it will first execute the 2-to-3 migration script and then execute the 3-to-4 migration script. If it's "3", then it will just execute the 3-to-4 script. If it's "1", then it will execute all of them. All of this is done within a database transaction, so if something goes wrong during the migration process, the database will be restored to its previous state.

The psuedo-code below shows how this is done in code.

//connect to the database
Connection db = ...
db.setAutoCommit(false);

int schemaVersion = 4;
int curSchemaVersion = //"SELECT db_schema_version FROM sleet"
if (curSchemaVersion < schemaVersion) {
  //schema is outdated, run the migration script(s)
  Statment statement = db.createStatement();
  while (curSchemaVersion < schemaVersion) {
    String script = "migrate-" + curVersion + "-" + (curVersion + 1) + ".sql";
    SQLStatementReader in = new SQLStatementReader(new InputStreamReader(getClass().getResourceAsStream(script)));
    String sql;
    while ((sql = in.readStatement()) != null) {
      statement.execute(sql);
    }
    curSchemaVersion++;
  }

  //update the version number in the database
  //"UPDATE sleet SET db_schema_version = [schemaVersion]"

  //commit the transaction
  db.commit();
}