mirror of
https://github.com/fluencelabs/llamadb
synced 2025-05-29 07:21:24 +00:00
Add Group trait, used to yield rows and perform aggregate functions.
This commit is contained in:
parent
71eda59e5c
commit
3da60e6c29
@ -4,5 +4,11 @@ pub trait DatabaseStorage {
|
|||||||
type Info: DatabaseInfo;
|
type Info: DatabaseInfo;
|
||||||
|
|
||||||
fn scan_table<'a>(&'a self, table: &'a <Self::Info as DatabaseInfo>::Table)
|
fn scan_table<'a>(&'a self, table: &'a <Self::Info as DatabaseInfo>::Table)
|
||||||
-> Box<Iterator<Item=Box<[<Self::Info as DatabaseInfo>::ColumnValue]>> + 'a>;
|
-> Box<Group<ColumnValue=<Self::Info as DatabaseInfo>::ColumnValue> + 'a>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait Group {
|
||||||
|
type ColumnValue: Sized + 'static;
|
||||||
|
|
||||||
|
fn iter<'a>(&'a self) -> Box<Iterator<Item=Box<[Self::ColumnValue]>> + 'a>;
|
||||||
}
|
}
|
||||||
|
@ -1,24 +1,45 @@
|
|||||||
use columnvalueops::ColumnValueOps;
|
use columnvalueops::ColumnValueOps;
|
||||||
use databaseinfo::DatabaseInfo;
|
use databaseinfo::DatabaseInfo;
|
||||||
use databasestorage::DatabaseStorage;
|
use databasestorage::{DatabaseStorage, Group};
|
||||||
use super::sexpression::{BinaryOp, SExpression};
|
use super::sexpression::{BinaryOp, SExpression};
|
||||||
|
|
||||||
|
enum SourceType<'a, ColumnValue: Sized + 'static> {
|
||||||
|
Row(&'a [ColumnValue]),
|
||||||
|
Group(&'a Group<ColumnValue=ColumnValue>)
|
||||||
|
}
|
||||||
|
|
||||||
struct Source<'a, ColumnValue: Sized + 'static> {
|
struct Source<'a, ColumnValue: Sized + 'static> {
|
||||||
parent: Option<&'a Source<'a, ColumnValue>>,
|
parent: Option<&'a Source<'a, ColumnValue>>,
|
||||||
source_id: u32,
|
source_id: u32,
|
||||||
row: &'a [ColumnValue],
|
source_type: SourceType<'a, ColumnValue>
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, ColumnValue: Sized> Source<'a, ColumnValue> {
|
impl<'a, ColumnValue: Sized> Source<'a, ColumnValue> {
|
||||||
fn find_row_from_source_id(&self, source_id: u32) -> Option<&[ColumnValue]> {
|
fn find_row_from_source_id(&self, source_id: u32) -> Option<&[ColumnValue]> {
|
||||||
if self.source_id == source_id {
|
if self.source_id == source_id {
|
||||||
Some(self.row)
|
match &self.source_type {
|
||||||
|
&SourceType::Row(row) => Some(row),
|
||||||
|
_ => None
|
||||||
|
}
|
||||||
} else if let Some(parent) = self.parent {
|
} else if let Some(parent) = self.parent {
|
||||||
parent.find_row_from_source_id(source_id)
|
parent.find_row_from_source_id(source_id)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn find_group_from_source_id(&self, source_id: u32) -> Option<&Group<ColumnValue=ColumnValue>> {
|
||||||
|
if self.source_id == source_id {
|
||||||
|
match &self.source_type {
|
||||||
|
&SourceType::Group(group) => Some(group),
|
||||||
|
_ => None
|
||||||
|
}
|
||||||
|
} else if let Some(parent) = self.parent {
|
||||||
|
parent.find_group_from_source_id(source_id)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The query plan is currently defined as a recursive language.
|
/// The query plan is currently defined as a recursive language.
|
||||||
@ -55,11 +76,12 @@ where <Storage::Info as DatabaseInfo>::Table: 'a
|
|||||||
{
|
{
|
||||||
match expr {
|
match expr {
|
||||||
&SExpression::Scan { table, source_id, ref yield_fn } => {
|
&SExpression::Scan { table, source_id, ref yield_fn } => {
|
||||||
for row in self.storage.scan_table(table) {
|
let group = self.storage.scan_table(table);
|
||||||
|
for row in group.iter() {
|
||||||
let new_source = Source {
|
let new_source = Source {
|
||||||
parent: source,
|
parent: source,
|
||||||
source_id: source_id,
|
source_id: source_id,
|
||||||
row: &row
|
source_type: SourceType::Row(&row)
|
||||||
};
|
};
|
||||||
|
|
||||||
try!(self.execute(yield_fn, result_cb, Some(&new_source)));
|
try!(self.execute(yield_fn, result_cb, Some(&new_source)));
|
||||||
@ -72,7 +94,7 @@ where <Storage::Info as DatabaseInfo>::Table: 'a
|
|||||||
let new_source = Source {
|
let new_source = Source {
|
||||||
parent: source,
|
parent: source,
|
||||||
source_id: source_id,
|
source_id: source_id,
|
||||||
row: row
|
source_type: SourceType::Row(row)
|
||||||
};
|
};
|
||||||
|
|
||||||
self.execute(yield_out_fn, result_cb, Some(&new_source))
|
self.execute(yield_out_fn, result_cb, Some(&new_source))
|
||||||
@ -152,7 +174,7 @@ where <Storage::Info as DatabaseInfo>::Table: 'a
|
|||||||
let new_source = Source {
|
let new_source = Source {
|
||||||
parent: source,
|
parent: source,
|
||||||
source_id: source_id,
|
source_id: source_id,
|
||||||
row: &row
|
source_type: SourceType::Row(&row)
|
||||||
};
|
};
|
||||||
|
|
||||||
self.resolve_value(yield_out_fn, Some(&new_source))
|
self.resolve_value(yield_out_fn, Some(&new_source))
|
||||||
|
@ -8,7 +8,7 @@ use std::collections::BTreeSet;
|
|||||||
|
|
||||||
use columnvalueops::ColumnValueOps;
|
use columnvalueops::ColumnValueOps;
|
||||||
use databaseinfo::{DatabaseInfo, TableInfo, ColumnInfo};
|
use databaseinfo::{DatabaseInfo, TableInfo, ColumnInfo};
|
||||||
use databasestorage::DatabaseStorage;
|
use databasestorage::{Group, DatabaseStorage};
|
||||||
use identifier::Identifier;
|
use identifier::Identifier;
|
||||||
use types::{DbType, Variant};
|
use types::{DbType, Variant};
|
||||||
use sqlsyntax::ast;
|
use sqlsyntax::ast;
|
||||||
@ -41,12 +41,15 @@ impl DatabaseInfo for TempDb {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DatabaseStorage for TempDb {
|
struct ScanGroup<'a> {
|
||||||
type Info = TempDb;
|
table: &'a Table
|
||||||
|
}
|
||||||
|
|
||||||
fn scan_table<'a>(&'a self, table: &'a Table)
|
impl<'a> Group for ScanGroup<'a> {
|
||||||
-> Box<Iterator<Item=Box<[Variant]>> + 'a>
|
type ColumnValue = Variant;
|
||||||
{
|
|
||||||
|
fn iter<'b>(&'b self) -> Box<Iterator<Item=Box<[Variant]>> + 'b> {
|
||||||
|
let table = self.table;
|
||||||
let columns: &'a [self::table::Column] = &table.columns;
|
let columns: &'a [self::table::Column] = &table.columns;
|
||||||
|
|
||||||
Box::new(table.rowid_index.iter().map(move |key_v| {
|
Box::new(table.rowid_index.iter().map(move |key_v| {
|
||||||
@ -95,6 +98,18 @@ impl DatabaseStorage for TempDb {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl DatabaseStorage for TempDb {
|
||||||
|
type Info = TempDb;
|
||||||
|
|
||||||
|
fn scan_table<'a>(&'a self, table: &'a Table)
|
||||||
|
-> Box<Group<ColumnValue=Variant> + 'a>
|
||||||
|
{
|
||||||
|
Box::new(ScanGroup {
|
||||||
|
table: table
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl TempDb {
|
impl TempDb {
|
||||||
pub fn new() -> TempDb {
|
pub fn new() -> TempDb {
|
||||||
TempDb {
|
TempDb {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user