Documentation > Deployment > MapReduce Transactions

This following document describes how to use new experimental MapReduce-style stored procedures to execute distributed, analytical (OLAP) queries in H-Store.

Overview

MapReduce-style transactions allow for H-Store to execute analytical queries that access the entire database without having to incur the cost of distributed transaction coordination. When executing normal distributed transactions (i.e., transactions that need to access multiple partitions), H-Store blocks other transactions that are running or waiting in the queue on other partitions and send all data it need to base partition. This has been shown to have a significant impact on the throughput of the whole system. With H-Store’s MapReduce transactions, the transaction is split into many single-partition transactions that will independently execute in parallel on all partitions. Although it is invoked as a distributed transaction that blocks all partitions, the PartitionExecutor will continue to execute non-MapReduce single-partition transactions and better performance has been proven.

When a MapReduce transaction is invoked, the following sequence of phase occurs:

  1. Map:
    The transaction’s running at the base partition notifies all other partitions that to start the Map phase. The ExecutionSite at each partition will then invoke a single-partition transaction that executes the MapInputQuery that retrieves data from its local storage. These transactions run separately and do not need to coordinate with each other. These records are then passed to the Map() method of the MapReduce stored procedure.

  2. Shuffle:
    After the single-partition Map transaction finishes, it will automatically begin the shuffle phase in a separate, non-blocking thread. Data that has the same key which is defined in MapReduce stored procedure will be sent to the same destination (i.e. partition) from MapOutputTable to the input of the Reduce() method.

  3. Reduce:
    After sending all the data to its destination, it will move to Reduce phase where the tranaction’s running at the base partition notifies all other partitions that to begin the Reduce phase. This is similar to Map phase when ReduceInputTable is prepared ready. Tuples in ReduceInputTable will be sorted by the key which is the first column by default. Tuples with the same key are then passed to the Reduce method of the MapReduce stored procedure. The output of the Reduce method at each partition is coalesced at the base partition for the transaction and sent back to the client.

Note that H-Store will automatically partition the shuffle data using the first key of the Map’s emit table.

MapReduce API

Just as with regular transactions in H-Store, all MapReduce transactions must be pre-defined as stored procedures in the benchmarks. To create a new MapReduce stored procedure, one must create a new Java class that extends the VoltMapReduceProcedure abstract class. Users must implement map and reduce these two abstract functions, these will be talked about next.Each implementation of VoltMapReduceProcedure needs to include the following six components:

  1. ProcInfo:
    Before the definition of the class, it is a must to define the MapInputQuery ProcInfo. Internal system will read this name and match with it to know the user defined query next. It should be defined in MapReduce Stored Procedure like next:

    @ProcInfo(
        mapInputQuery = "mapInputQuery"
    )
  2. MapInputQuery:
    This is the query that is executed when the transaction starts and provides the input data to the Map method. The input parameters to this query are used as the input parameters to the transaction. This query is always executed as a local, single-partition query on each partition in the cluster that is executing the Map phase for the transaction. Besides, MapInputQuery that will get the input data for MapReduce job must not be null and users should write SQL-like query for it like next:

    public SQLStmt mapInputQuery = new SQLStmt(
        "SELECT A_NAME, COUNT(*) FROM TABLEA WHERE A_AGE >= ? GROUP BY A_NAME" // the "?" is the input parameter
    );
  3. VoltTable.ColumnInfo[] getMapOutputSchema():
    This defines what the output table schema of the Map method that will be used as input to the Reduce method.. The data will automatically be sent to a particular partition for the Reduce phase based on the hash value of the first column in the output table by default. MapOutput table schema can be defined like next:

    @Override
    public VoltTable.ColumnInfo[] getMapOutputSchema() {
        return new VoltTable.ColumnInfo[]{
            // this is the key that will be hashed by default, the key type here is String
            new VoltTable.ColumnInfo("NAME", VoltType.STRING), 
            new VoltTable.ColumnInfo("COUNTER", VoltType.BIGINT),
        };
    }
  4. Map(VoltTableRow row):
    The Map() is invoked for each record (i.e. tuple) returned by the MapInputQuery. It performs some unit of processing on that row and can pass output for further processing by the Reduce method by invoking VoltMapReduceProcedure.mapEmit(). This output will be passed into the Shuffle method after the Map finishes. Map function for a name counter MapReduce job can be defined like next:

    @Override
    public void map(VoltTableRow row) {
        String key = row.getString(0); // get key from column 0 by default
        Object new_row[] = { key, row.getLong(1) };  // this row will be insert into mapOutput table
        this.mapEmit(key, new_row); // mapOutputTable, Emit the intermediate data
    }
  5. VoltTable.ColumnInfo[] getReduceOutputSchema():
    This is very similar to MapOutputSchema.It defines what the output table schema of the Reduce method that will be sent back to the client. MapOutputSchema and ReduceOutputSchema can be very different for many other cases although they are more or less the same here.

    @Override
    public VoltTable.ColumnInfo[] getReduceOutputSchema() {
        return new VoltTable.ColumnInfo[]{
            //  The first column that is the key should be the same with the MapOutputSchema
            new VoltTable.ColumnInfo("NAME", VoltType.STRING),
            new VoltTable.ColumnInfo("COUNTER", VoltType.BIGINT),
        };
    }
  6. Reduce(Key k, Iterator<VoltTableRow> row):
    The Reduce() is invoked at each partition for processing the output data of the Shuffle method. There is not any need for users to care about the Shuffle function and the internal data send part. The input for reduce function is so well prepared by internal system that tuples with the same key can be accessed by the Iterator. This Iterator implements the Iterable interface but does nothing for remove() method.

    @Override
    public void reduce(String key, Iterator<VoltTableRow> rows) {
        long count = 0;
        for (VoltTableRow r : CollectionUtil.iterable(rows)) {
                count+ = (long)r.getLong(1);
        } // FOR
        Object new_row[] = {key, count};
        this.reduceEmit(new_row);// reduceOutput table
    }

Last but not the least, the iterator should be also talked about. This part will be added later.

Finally, there is a very simple MapReduce stored procedure class doing name counter called MockMapReduce for reference. This is the real demo implementation code for a simple MapReduce Stored Procedure.

Evaluation

  1. Distributed tested query 1 and test result figure can be seen next.
    SELECT ol_number,SUM(ol_quantity),
    SUM(ol_amount),AVG(ol_quantity),
    AVG(ol_amount),COUNT(*)
    FROM order_line
    GROUP BY ol_number
    ORDER BY ol_number

    Query tested for MapReduce transaction

  2. This query is a good example to show MapReduce transaction has better performance than H-Store normal distributed transaction. The X axis in is the Deployment of H-Store and the Y axis is the execution time of running Query1 in milliseconds.

    It’s clearly to see that the MapReduce Transaction will keep better performance as the number of partition
    increases. Partition doubles does not indicates the input data doubles. However, we could know that the performance of MapReduce transaction will not declinegreatly than normal distributed transaction. Normal Distributed transaction will block other transactions on other partitions and send the data it needs to the base partition to do the aggregate operations which takes amount of time. MapReduce will treat it as many single partition transaction executed on each partition.

    After shuffle phase, all the data are well parted on each partition to do aggregate operation locally and partially, which seems to divide this task into every partition to do instead of a single base partition. So the MapReduce transaction will definitely have better performance across the cluster.

    More interesting evaluation work will be put out soon(Contact me if you are interested: xin at cs.brown.edu)…

Future Work

  1. It would be nice to run two or more table JOIN with many aggregate operation queries. Right now the H-Store system does not support this or I am not sure how to use it in H-Store way. The query may be like query next:
    SELECT ol_number, SUM(ol_amount), AVG(ol_quantitiy)
    FROM order_line, item 
    WHERE order_line.ol_i_id = item.i_id 
    GROUP BY ol_number ORDER BY ol_number
  2. We would really like to how the data input scale affect the performance of these two kinds of transactions. By increasing the cluster scale with data input, I may test and prove the first conclusion in summary.
  3. Measure the effect of throughput on TPC-C instead of the executing time of these two kinds of transactions.
  4. More details about the project can be found on this page: github H-Store