Documentation > Development > Query Planning & Execution

The following is meant to document the process of how the system processes query requests from transactions and executes them in on the appropriate partitions.

When a transaction is running, the procedure’s control code can queue SQLStmts in a batch and then issue that batch for execution to the system. For example, the transaction shown below from YCSB will queue two invocations of the readStmt query in a batch and then have the DBMS execute them using voltExecuteSQL. The call to voltExecuteSQL will block the transaction’s control code until the results are retrieved and returned.

SQLStmt readStmt = new SQLStmt("SELECT * FROM USERTABLE WHERE YCSB_KEY=?");
public VoltTable[] run(long id0, long id1) {
    voltQueueSQL(readStmt, id0);
    voltQueueSQL(readStmt, id1);
    return (voltExecuteSQL(true));
}

There are several things that need to happen in order to execute these queries at the partitions that have the data that it needs and to get the results back to the transaction. The easiest thing to do is to just send the query to all partitions to look for the data, but this means that each transaction would need to lock the entire cluster every time (which would be horribly slow). Instead, H-Store will examine the queries and its input parameters to determine exactly where to send queries.

It is important to note that H-Store does not do any query optimizations at runtime (e.g., predicate pushdowns). All of this is done when the project is compiled using hstore-prepare.

We now describe these steps in detail.

Computing Query Target Partitions

After the transaction invokes voltExecuteSQL, the underlying logic in VoltProcedure computes a unique hash identifier of all of the SQLStmts in the current batch. It then retrieves (or creates) a Batch Planner instance that is specific for that identifier at that partition. The Batch Planner is responsible for examining each SQLStmt invocation and its input parameters in the batch to determine what partition(s) the query needs to be executed on. Note that each Batch Planner is not shared between partitions, and thus it does not need any synchronization primitives.

Each SQLStmt is linked to a Statement catalog object that contains the pre-compiled query plans. Each Statement has a separate query plan for single-partition execution and multi-partition execution. Thus, it is the Batch Planner‘s job to determine which one it needs to use for each invocation. Although the two plans are logically the same, the multi-partition plan will have special nodes to transmit intermediate data between partitions and other optimizations needed for distributed query execution (e.g., computing AVG() by taking the weighted average from each partition).

The main logic of determining where to send queries is in BatchPlanner.plan(). In there, it will iterate over each SQLStmt and use the PartitionEstimator to determine what partitions that it needs to touch. Each SQLStmt is assumed to be single-partitioned, and thus the Batch Planner will use the PartitionEstimator to examine that SQLStmt‘s single-partition query plan. If the PartitionEstimator determines that the query needs to touch more than one partitions, then the Batch Planner will switch to use the multi-partition query plan for that SQLStmt. Note that this is separate from checking whether a query needs to execute on the base partition for the transaction’s control code; a query can be single-partitioned even though it is executing on a remote partition.

After this process is complete, each SQLStmt in the batch will have a mapping of PlanFragments that need to be executed on its behalf and the set of partitions that each of them need to be sent to. The Batch Planner then determines how the system will need to execute the batch. There are three possibilities:

  1. If all of the queries are “local” (i.e., they only need to execute on the transaction’s base partition), then the Batch Planner sends the queries directly to that partition’s PartitionExecutor for immediate execution. This is the fastest and most efficient execution path in the system.
  2. If one or more of the queries need to access a partition different than the transaction’s base partition, then the BatchPlanner will generate a dependency graph of WorkFragments that the system will schedule for execution.
  3. If at least one of the queries needs to access a partition that the transaction did not acquire the lock for before it started, then then a MispredictionException is thrown and the transaction is aborted. The transaction will be requeued and restarted with all of the locks for the partitions that it attempted to touch.

We now discuss the second case, where queries need to be distributed to multiple partitions.

Generating WorkFragments

Each Plan Fragment for a query contains a list of unique input and output dependency identifiers. An output dependency represents the VoltTable that is generated by a Plan Fragment when it is executed in the C++ . An input dependency for a Plan Fragment is an output dependency of another Plan Fragment that needs to get executed first before it can be executed. The Batch Planner uses this dependency information to generate a dependency graph (e.g., BatchPlanner.PlanGraph) for each SQLStmt that represents the order in which its PlanFragments need to be executed.

The dependency graph is then used by the Batch Planner (i.e., BatchPlanner.createWorkFragmentsBuilders()) to generate all of the Protocol Buffer WorkFragments messages that are sent to the partitions in order to execute the PlanFragments. The Batch Planner leaves the WorkFragments in their intermediate “Builder” form so that the PartitionExecutor can perform additional optimizations later on.

Each WorkFragment contains a series of PlanFragments identifiers that need to be executed at a partition in a given “round”. A round represents all of the PlanFragments that can be executed once the previous round has finished (e.g., all of the input dependencies needed by that round have been generated by the previous round). If the PlanFragments do not have any input dependencies, then they will be in the first round and can be scheduled for execution immediately.

After generating the WorkFragments needed for the entire SQLStmt batch, the batch plan is then handed off to the PartitionExecutor to schedule their dispatching and execution in the cluster.

Dispatching & Executing WorkFragments

The main logic for controlling how the WorkFragments are distributed to the appropriate partitions at runtime is found in PartitionExecutor.dispatchWorkFragments(). The high-level idea of this method is that it first submits all of the WorkFragments for the first round that can be executed immediately (i.e., they do not have any input dependencies) and then blocks until their output dependencies arrive back to the transaction. It is then awoken when all of the input dependencies for the next round have arrived and are now eligible for execution. This process repeats until there are no more WorkFragment rounds to dispatch and all of the output dependencies for the outstanding rounds have arrived.

There is one WorkFragment per partition in each round. All of the WorkFragments for partitions on the same Site are combined into a single TransactionWorkRequest that is sent over the network using the HStoreCoordinator. The HStoreCoordinator invokes the PartitionExecutor‘s internal callback handle (PartitionExecutor.request_work_callback) to notify it when the corresponding TransactionWorkResult is returned for that request.

On remote partitions, when the HStoreCoordinator receives a TransactionWorkRequest for an active transaction, it wraps the embedded Protocol Buffer WorkFragments into a WorkFragmentMessage and queues the them in their target PartitionExecutor. The PartitionExecutor retrieves this WorkFragmentMessage and then executes the corresponding PlanFragments in its ExeutionEngine and then invokes the corresponding callback to transmit the results back to the transaction’s base partition.