Классика баз данных - статьи



         

Интерфейс программирования


В этом подразделе мы опишем интерфейс программирования. На рис. 6 показан Java-класс, реализующий SQL/MR-функцию sessionize (для нашего сквозного примера формирования пользовательских сессий).

public class Sessionize implements PartitionFunction { // Constructor (called at initialization)

public Sessionize (RuntimeContract contract) { InputInfo inputInfo = contract.get InputInfo() ;

// Determine time column

String timeColumnName = contract.useArgumentClause(”timecolumn”).getSingleValue(); timeColumnIndex_ = inputInfo.getColumnIndex(timeColumnName);

// Determine timeout

String timeoutValue = contract.useArgumentClause(”timeout”).getSingleValue(); timeout_ = Integer.parseInt(timeoutValue);

// Define output columns

List outputColumns = new ArrayList(); outputColumns.addAll(inputInfo.getColumns()); outputColumns.add(new ColumnDefinition(”sessionid”, SqlType.integer()));

// Complete the contract

contract.setOutputInfo( new OutputInfo(outputColumns)); contract.complete(); }

// Operate method ( called at runtime, for each partition)

public void operateOnPartition( PartitionDefinition partition, RowIterator inputIterator, // Iterates over all rows in the partition

RowEmitter outputEmitter // Used to emit output rows

) { int currentSessionId = 0 ; int lastTime = Integer.MIN_VALUE;

// Advance through each row in partition

while (inputIterator.advanceToNextRow()) { // Determine if time of this click is more than timeout after the last

int currentTime = inputIterator.getIntAt(timeColumnIndex_); if(currentTime > lastTime + timeout_) ++currentSessionId; // Emit ouput row with all input columns, plus current session id

outputEmitter.addFromRow(inputIterator); outputEmitter.addInt(currentSessionId); outputEmitter.emitRow(); lastTime = currentTime; } }

// State saved at initialization, used during runtime

private int timeColumnIndex_; private int timeout_; };

Рис. 6. Реализация повторно используемой функции sessionize с использованием Java API SQL/MR.




Содержание  Назад  Вперед